This article walks through the steps required to successfully setup a JDBC sink connector for Kafka and have it consume data from a Kafka topic and subsequently store it in MySQL, PostgreSQL, etc. This example also uses Kafka Schema Registry to produce and consume data adhering to Avro schemas.

Before the connector is set up, a number of details regarding both the Kafka service and your RDBMS service are required. This help article assumes the use of Aiven for PostgreSQL service as the destination of the JDBC sink.

Steps to setup BigQuery sink connector with Aiven for Kafka

Setting up Kafka service

1. Create Kafka service (minimum Business-4 plan) in the cloud and region of your choice

2. Enable Kafka connect and Schema Registry sub-services for the Kafka service

3. Collect Kafka, Kafka connect and Schema registry registry details that are required

4. Create topic, jdbc_sink, in Kafka

Following Kafka service details are required (from Aiven console):

kafka_service_uri (e.g. mykafka-myproj.aivencloud.com:17295)

schema_registry_service_uri (e.g. “https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298”)

sr_host (e.g. “mykafka-myproj.aivencloud.com”)

sr_port (e.g. “17298”)

sr_password (e.g. “xxxyyy”)

kafka_connect_service_uri (e.g. “https://avnadmin:YYY@kafka-customer-demo-exercise1.aivencloud.com:17293”)

Setting up PostgreSQL Service

Create an Aiven for PostgreSQL service.

Following PostgreSQL service details are required (from Aiven console):

pg_host (e.g. "mypg-myproj.aivencloud.com")
pg_port (e.g. "17293")
pg_pw (e.g. "xxxyyy")

pg_service_uri (e.g. "postgres://avnadmin:pg_pw@pg_host:pg_port/defaultdb?sslmode=require")


Using the above, construct the JDBC driver connect string as follows:

jdbc_conn_string = "jdbc:postgresql://pg_host:pg_port/defaultdb?user=avnadmin&password=pg_pw&ssl=true"

Publish some messages to Kafka topic

Publish messages to the bigqtopic topic using kafka-avro-console-producer (this can be obtained from Confluent).

./kafka-avro-console-producer --broker-list $kafka_service_uri --topic jdbc_sink --producer.config /path/console-producer.properties --property schema.registry.url=https://schema_registry_host:sr_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:sr_password --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

Sample avro-producer.properties file contents:

bootstrap.servers=kafka_service_uri

security.protocol=SSL

ssl.truststore.location=/path/client.truststore.jks

ssl.truststore.password=secret

ssl.keystore.type=PKCS12

ssl.keystore.location=/path/client.keystore.p12

ssl.keystore.password=secret

ssl.key.password=secret

key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

The truststore and keystore can be created as follows (also described in the getting started guide for Kafka)

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key

keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

JDBC sink connector configuration

{

"name": "jdbc-sink-conn",

"config":

{

"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

"connection.url":"jdbc_connect_string",

"tasks.max":"1",

"topics": "jdbc_sink",

"auto.create": "true",

"value.converter":"io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url":"https://sr_host:sr_port",

"value.converter.basic.auth.credentials.source":"USER_INFO",

"value.converter.basic.auth.user.info":"avnadmin:sr_pw"

}

}

Save the above config to the file jdbc_sink.json


Setting up the BigQuery sink connector for Kafka

Use the following command from the unix command line to set up the connector:

$curl -s -H "Content-Type: application/json" -X POST -d @jdbc_sink.json $kafka_connect_service_uri/connectors

Check to verify the connector is in "Running" state and at least one task, "1/1", is running in the Kafka connect tab for the Kafka service. If not, check for exceptions in both the Kafka connect tab and in the Logs tab for the Kafka service for potential errors.

Check the PostgreSQL database for the inserted records.

$ psql pg_service_uri

defaultdb=> SELECT * FROM public.jdbc_sink;

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:




Did this answer your question?