Debezium is a Kafka Connect Connector that allows you to stream the changes happening in your database to Kafka in real-time.

Once you have the data in Kafka it allows you to transform and process the data at will. Kafka is a natural solution for this sort of streaming data problem.

Setting up Debezium

First of all you need to have an Aiven MySQL 8 service running alongside an Aiven Kafka service that is using a Business-4 plan or above so it can use Kafka Connect.

Once you have these services running you need to decide which tables do you want to replicate into Kafka. For this example we're going to just replicate all the tables in the database defaultdb .

Setting up the topics 

There are two ways to set up the needed Kafka topics. 

The simpler way is to set the service to create topics automatically by running:

avn service update myservice -c "kafka.auto_create_topics_enable=true" 

The downside of this approach is that the replication factor, and especially the cleanup policy will use the defaults which are not optimal.  Debezium can greatly benefit from from using the COMPACT policy over the DELETE cleanup policy as it will then make sure the topic contains only the latest version of a row.

The other way of setting the topics up is to create them manually beforehand.

Connecting to Kafka for history topic management

The Debezium MySQL connector creates its own Kafka connection for keeping track of changes in the database schema. You usually want to make this connection to the Aiven Kafka running on the same nodes but these settings are not automatically populated. Due to the way how Debezium MySQL has been implemented the settings are also not explicitly exposed in the UI but they can still be manually defined in the JSON in Aiven web console or via curl when manually creating the connector. The example below shows all the relevant database.history.* settings that need to be defined and their correct values. The keystore and truststore file paths must be entered exactly as shown in the example; files with these names have been pregenerated on the nodes and there's no need to explicitly upload those.

Setting up the Debezium Connector
To set up the actual connector we need to call into the Kafka service's Kafka Connect REST API (i.e., the Kafka Connect URL from the Aiven Console) wth a call like this:

$ curl -H "Content-type:application/json" -X POST https://avnadmin:je20karab3l1y4jj@kafka-demoproject.aivencloud.net:25238/connectors -d '{
     "name": "test_connector",
     "config": {
       "connector.class": "io.debezium.connector.mysql.MySqlConnector",
       "database.hostname": "mysql-a-valtha-test.avns.net",
       "database.port": "25238",
       "database.user": "avnadmin",
       "database.password": "j523nvnzt2299enk",
       "database.dbname": "defaultdb",
       "database.server.name": "debezium-mysqltest",
       "database.ssl.mode": "required",
       "database.history.kafka.topic": "history",
       "database.history.kafka.bootstrap.servers": "kafka-demoproject.aivencloud.net:25240",
       "database.history.producer.security.protocol": "SSL",
       "database.history.producer.ssl.keystore.type": "PKCS12",
       "database.history.producer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
       "database.history.producer.ssl.keystore.password": "password",
       "database.history.producer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
       "database.history.producer.ssl.truststore.password": "password",
       "database.history.producer.ssl.key.password": "password",
       "database.history.consumer.security.protocol": "SSL",
       "database.history.consumer.ssl.keystore.type": "PKCS12",
       "database.history.consumer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
       "database.history.consumer.ssl.keystore.password": "password",
       "database.history.consumer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
       "database.history.consumer.ssl.truststore.password": "password",
       "database.history.consumer.ssl.key.password": "password"
     }
    }'
Did this answer your question?