This article shows you an example of how to set up a Java Database Connectivity (JDBC) sink connector for Apache Kafka, have it consume data from a Kafka topic, and store that data in a PostgreSQL database. This example also uses Kafka Schema Registry to produce and consume data that follows Avro schemas.

Before setting up the connector, a number of details for both the Kafka service and your relational database management system are required. This example uses the Aiven for PostgreSQL service as the destination for the JDBC sink.

Requirements

  • An Aiven for Apache Kafka service with a minimum service plan of Business-4. For instructions on setting up a new Kafka service, see this article.

  • Kafka Connect integration enabled in your Aiven for Apache Kafka service. You can set this up on the Connectors tab of your service page in the Aiven web console.

  • Schema Registry API enabled in your Aiven for Apache Kafka service. You can set this up on the Schemas tab of your service page in the Aiven web console.

  • A jdbc_sink Kafka topic created in your Aiven for Apache Kafka service.

  • Java keystore and truststore set up for your Aiven for Apache Kafka service. For instructions, see this article.

  • An Aiven for PostgreSQL service. For instructions on setting up a new PostgreSQL service, see this article.

This example uses the following variables from your Kafka service (these are available on the Overview tab of your service page in the Aiven web console):

  • kafka_service_uri (Overview > Kafka tab)

  • schema_registry_service_uri (Overview > Schema Registry tab)

  • schema_registry_host (Overview > Schema Registry tab)

  • schema_registry_port (Overview > Schema Registry tab)

  • schema_registry_password (Overview > Schema Registry tab)

  • kafka_connect_service_uri (Overview > Kafka Connect tab)

This example uses the following variables from your PostgreSQL service (these are available on the Overview tab of your service page in the Aiven web console):

  • postgresql_host

  • postgresql_port

  • postgresql_password

  • postgresql_service_uri

Setting up a JDBC sink connector with Aiven for Apache Kafka

  1. Publish some messages to the jdbc_sink topic using the kafka-avro-console-producer command:

    ./kafka-avro-console-producer --broker-list $kafka_service_uri --topic jdbc_sink --producer.config /path/console-producer.properties --property schema.registry.url=https://schema_registry_host:schema_registry_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:schema_registry_password --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}

    Example of the console-producer.properties file:

    bootstrap.servers=kafka_service_uri
    security.protocol=SSL
    ssl.truststore.location=/path/client.truststore.jks
    ssl.truststore.password=secret
    ssl.keystore.type=PKCS12
    ssl.keystore.location=/path/client.keystore.p12
    ssl.keystore.password=secret
    ssl.key.password=secret
    key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

  2. Create the configuration file (adjusting the values as required) for the JDBC sink connector and save it as jdbc_sink.json:

    {
    "name": "jdbc-sink-conn",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://postgresql_host:postgresql_port/defaultdb?user=avnadmin&password=postgresql_password&ssl=true",
    "tasks.max": "1",
    "topics": "jdbc_sink",
    "auto.create": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "https://schema_registry_host:schema_registry_port",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "avnadmin:schema_registry_password"
    }

  3. Run the following command on the Unix command line to set up the JDBC sink connector for Kafka:

    $curl -s -H "Content-Type: application/json" -X POST -d @jdbc_sink.json $kafka_connect_service_uri/connectors

  4. Check that the connector is in the Running state and that it has at least one task (1/1).
    You can see this on the Overview > Kafka Connect tab of your service page in the Aiven web console. If the connector is not running, check the Kafka Connect tab and the Logs tab for any error messages.

  5. Run the following command in psql to check that the records are added to the PostgreSQL database:

    $ psql pg_service_uri
    defaultdb=> SELECT * FROM public.jdbc_sink;

Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?