Aiven provides Kafka connect cluster as a managed service in combination with Aiven for Kafka managed service. However, there are circumstances where you may want to roll your own Connect cluster. This help article shows the steps necessary to integrate your own connect cluster with Aiven for Kafka (and Schema registry). In this example we create a JDBC sink connector to PostgreSQL database.

As a prerequisite, the following information should be collected.

Following Aiven Kafka and PostgreSQL services' details are required from Aiven console for the respective service:

KAFKA_HOST
KAFKA_PORT
SCHEMA_REGISTRY_PORT
SCHEMA_REGISTRY_PW
KAFKA_CONNECT_SERVICE_URI
PG_SERVICE_URI
PG_HOST
PG_PORT
PG_PW

Setup truststore and keystore:

  1. Download the Aiven project CA and client keys from the console
  2. Generate truststore and keystore
openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key

keytool -import -file ca.pem -alias CA -keystore client.truststore.jks 

Setup the Kafka service:

  1. Make sure that the advanced setting, "kafka.auto_create_topics_enable" is enabled. This can be set from the Overview tab for the Kafka service in the Aiven console. 
  2. Enable Schema Registry from the Overview tab for the Kafka service
  3. Create a topic named jdbc_sink

Download Kafka connect binaries:

  1. Download Confluent platform. (At the time of this writing, CP 5.3.1 was available)
  2. Download the Kafka connect JDBC connector from Confluent Hub (https://www.confluent.io/hub/)

Preparing Kafka connect software on a VM:

Log in to your VM.

cd $HOME

# extract confluent platform
tar xvf confluent-VERSION.tar.gz

# extract confluent connect jdbc
cd confluent-VERSION
cd share
mkdir -p kafka/plugins
cd kafka/plugins
unzip confluentinc-kafka-connect-jdbc-VERSION.zip


Create a properties file, my-connect-distributed.properties, for Kafka connect.

cd $HOME
cd confluent-VERSION
vi ./my-connect-distributed.properties
#
plugin.path=/path/confluent-VERSION/share/kafka/plugins/confluentinc-kafka-connect-jdbc-VERSION

bootstrap.servers=KAFKA_HOST:KAFKA_PORT

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Connect clusters create three topics to manage offsets, configs, and status
# information. Note that these contribute towards the total partition limit quota.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000

ssl.endpoint.identification.algorithm=https
request.timeout.ms=20000
retry.backoff.ms=500
security.protocol=SSL
ssl.protocol=TLS
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
ssl.keystore.type=PKCS12

consumer.ssl.endpoint.identification.algorithm=https
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.security.protocol=SSL
consumer.ssl.protocol=TLS
consumer.ssl.truststore.location=/path/client.truststore.jks
consumer.ssl.truststore.password=secret
consumer.ssl.keystore.location=/path/client.keystore.p12
consumer.ssl.keystore.password=secret
consumer.ssl.key.password=secret
consumer.ssl.keystore.type=PKCS12

producer.ssl.endpoint.identification.algorithm=https
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.security.protocol=SSL
producer.ssl.protocol=TLS
producer.ssl.truststore.location=/path/client.truststore.jks
producer.ssl.truststore.password=secret
producer.ssl.keystore.location=/path/client.keystore.p12
producer.ssl.keystore.password=secret
producer.ssl.key.password=secret
producer.ssl.keystore.type=PKCS12
#


Import the Aiven project CA into the JVM's trust store.

  1. Download Aiven project CA - ca.pem.

Then transfer it to the VM. Execute the following steps on each VM participating in the connect cluster:

# Import the Aiven project CA into the JVM's trust store
sudo su
cd /tmp
openssl x509 -in /path/ca.pem -inform pem -out ca.der -outform der
keytool -v -printcert -file ca.der
#
cp $JAVA_HOME/jre/lib/security/cacerts $JAVA_HOME/jre/lib/security/cacerts.orig
#
keytool -importcert -alias startssl -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -file ca.der
#
keytool -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -list | grep startssl
#
exit


Start the cluster

cd $HOME
cd confluent-VERSION
./bin/connect-distributed ./my-connect-distributed.properties

Create the JDBC sink connector (json) configuration jdbc-sink-pg.json

{
  "name": "jdbc-sink-pg",
  "config":
  {
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url":"jdbc:postgresql://PG_HOST:PG_PORT/defaultdb?user=avnadmin&password=PG_PW&ssl=true",
  "tasks.max":"1",
  "topics": "jdbc_sink",
  "auto.create": "true",
  "value.converter":"io.confluent.connect.avro.AvroConverter",
 "value.converter.schema.registry.url":"https://KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "value.converter.basic.auth.credentials.source":"USER_INFO",
  "value.converter.basic.auth.user.info":"avnadmin:SCHEMA_REGISTRY_PW"
  }
}

Create the JDBC sink connector instance

curl -s -H "Content-Type: application/json" -X POST -d @jdbc-sink-pg.json http://localhost:8083/connectors/ | jq .

Check the status of the JDBC sink connector instance

# check the status
curl localhost:8083/connectors/jdbc-sink-pg/status | jq

# check running tasks
curl localhost:8083/connectors/jdbc-sink-pg/tasks

Publish data to the jdbc_sink topic using kafka-avro-console-producer

console-producer.properties

security.protocol=SSL
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret


cd $HOME
cd confluent-VERSION

./bin/kafka-avro-console-producer --broker-list KAFKA_HOST:KAFKA_PORT --topic jdbc_sink  --producer.config ./console-producer.properties --property schema.registry.url=https://KAFKA_HOST:SCHEMA_REGISTRY_PORT --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:SCHEMA_REGISTRY_PW --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'

Data...

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

Login into PostgreSQL database and check for data.

psql PG_SERVICE_URI

psql> select * from jdbc_sink;

Did this answer your question?