This article shows you an example of how to set up a Java Database Connectivity (JDBC) sink connector for Apache Kafka, have it consume data from a Kafka topic, and store that data in a PostgreSQL database. This example also uses Kafka Schema Registry to produce and consume data that follows Json schemas.

Before setting up the connector, a number of details for both the Kafka service and your relational database management system are required. This example uses the Aiven for PostgreSQL service as the destination for the JDBC sink.

Requirements

  • An Aiven for Apache Kafka service with a minimum service plan of Business-4. For instructions on setting up a new Kafka service, see this article.

  • Kafka Connect integration enabled in your Aiven for Apache Kafka service. You can set this up on the Connectors tab of your service page in the Aiven web console.

  • Schema Registry API enabled in your Aiven for Apache Kafka service. You can set this up on the Schemas tab of your service page in the Aiven web console.

  • A jdbc_sink Kafka topic created in your Aiven for Apache Kafka service.

  • An Aiven for PostgreSQL service. For instructions on setting up a new PostgreSQL service, see this article.

This example uses the following variables from your Kafka service (these are available on the Overview tab of your service page in the Aiven web console):

  • kafka_service_uri (Overview > Kafka tab)

  • schema_registry_service_uri (Overview > Schema Registry tab)

  • schema_registry_host (Overview > Schema Registry tab)

  • schema_registry_port (Overview > Schema Registry tab)

  • schema_registry_password (Overview > Schema Registry tab)

  • kafka_connect_service_uri (Overview > Kafka Connect tab)

This example uses the following variables from your PostgreSQL service (these are available on the Overview tab of your service page in the Aiven web console):

  • postgresql_host

  • postgresql_port

  • postgresql_password

  • postgresql_service_uri

Setting up a JDBC sink connector with Aiven for Apache Kafka

  • On the Connectors tab, click +Create New Connector > From the connector list, select JDBC Sink:

  • Edit the connector configuration on the Connectors pane and add the following connector properties and create the connector.

    {
    "name": "jdbc",
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "jdbc_sink",
    "connection.url": "jdbc:postgresql://pg-test-sebin-dev-sandbox.com:12691/defaultdb,
    "auto.create": "true",
    "connection.user":"avnadmin",
    "connection.password":"******"
    }
  • Check that the connector is in the Running state and that it has at least one task (1/1).
    You can see this on the Overview > Kafka Connect tab of your service page in the Aiven web console. If the connector is not running, check the Kafka Connect tab and the Logs tab for any error messages.

  • Now we are going to ingest some messages into the table using Kafka topic with the same name jdbc_sink by the JDBC Sink.

  • Let's create a message and push into the topic jdbc_sink. The table will be created with schema passed in the message. The schema should be provided in the message otherwise the connector throws an exception because we using org.apache.kafka.connect.json.JsonConverter as the value convertor.

    { "schema": { "type": "struct", "fields": [ { "field": "text", "type": "string", "optional": false }, { "field": "sent_at", "type": "int64", "name": "org.apache.kafka.connect.data.Timestamp", "optional": false } ] }, "payload": { "text": "Hello", "sent_at": 1560507792000 } }

  • If auto.create=true then the connector can create the destination table if the table is not present in the database.

  • Run the following command psql to check that the records are added to the PostgreSQL database:

    $ psql pg_service_uri
    defaultdb=> SELECT * FROM public.jdbc_sink;

Did this answer your question?