Debezium is a Kafka Connect Connector that allows you to stream the changes happening in your database to Kafka in real-time.

Once you have the data in Kafka it allows you to transform and process the data at will. Kafka is a natural solution for this sort of streaming data problem.

Setting up Debezium

First of all you need to have an Aiven PostgreSQL 10 service running alongside an Aiven Kafka service that is using a Business-4 plan or above so it can use Kafka Connect.

Once you have these services running you need to decide which tables do you want to replicate into Kafka. For this example we're going to just replicate one table that's called the source_table. 

Setting up the topics 

There are two ways to set up the needed Kafka topics. 

The simpler way is to set the service to create topics automatically by running:

avn service update myservice -c "kafka.auto_create_topics_enable=true"

The downside of this approach is that the replication factor, and especially the cleanup policy will use the defaults which are not optimal.  Debezium can greatly benefit from from using the COMPACT policy over the DELETE cleanup policy as it will then make sure the topic contains only the latest version of a row.

The other way of setting the topics up is to create them manually beforehand. The naming pattern that Debezium uses for topics is servername.schema.table_name for example debezium-pg.public.source_table describes the table source_table in the schema public on the server with the name debezium-pg. If you decide to precreate them we recommend using COMPACT as the cleanup policy for them.

Setting up the Debezium Connector
 
To set up the actual connector we need to call into the Kafka service's Kafka Connect REST API wth a call like this:

$ curl -H "Content-type:application/json" -X POST https://avnadmin:jsw06fghona27r71@debezium-kafka-demoproject.aivencloud.com:22737/connectors -d '{
 "name": "test_connector",
 "config": {
   "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
   "database.hostname": "debezium-pg-demoproject.aivencloud.com",
   "database.port": "22737",
   "database.user": "avnadmin",
   "database.password": "nqj26a2lni8pi2ax",
   "database.dbname": "defaultdb",
   "database.server.name": "debezium-pg",
   "table.whitelist": "public.source_table",
   "plugin.name": "wal2json",
   "database.sslmode": "require"
 }
}'


WAL2JSON limitations

Note that the wal2json logical decoding plugin has limitations in the datatypes that it can support. 

Currently (as of 12.4.2018) it supports the following PostgreSQL datatypes explicitly:
INT2OID, INT4OID, INT8OID, OIDOID, FLOAT4OID, FLOAT8OID, NUMERICOID, BOOLOID, BYTEAOID (the contents are shown in hex format). 

Besides these it automatically turns all other datatypes into strings based on their textual representation. Basically this means that you need to be wary of changed meaning if you're using other PostgreSQL datatypes. 

Did this answer your question?