Debezium is a Kafka Connect Connector that allows you to stream the changes happening in your database to Kafka in real-time.

Once you have the data in Kafka it allows you to transform and process the data at will. Kafka is a natural solution for this sort of streaming data problem.

Setting up Debezium

First of all you need to have an Aiven PostgreSQL 10 service running alongside an Aiven Kafka service that is using a Business-4 plan or above so it can use Kafka Connect.

Once you have these services running you need to decide which tables do you want to replicate into Kafka. For this example we're going to just replicate one table that's called the source_table. 

Setting up the topics 

There are two ways to set up the needed Kafka topics. 

The simpler way is to set the service to create topics automatically by running:

avn service update myservice -c "kafka.auto_create_topics_enable=true"

The downside of this approach is that the replication factor, and especially the cleanup policy will use the defaults which are not optimal.  Debezium can greatly benefit from from using the COMPACT policy over the DELETE cleanup policy as it will then make sure the topic contains only the latest version of a row.

The other way of setting the topics up is to create them manually beforehand. The naming pattern that Debezium uses for topics is servername.schema.table_name for example debezium-pg.public.source_table describes the table source_table in the schema public on the server with the name debezium-pg. If you decide to precreate them we recommend using COMPACT as the cleanup policy for them.

Setting up the Debezium Connector
To set up the actual connector we need to call into the Kafka service's Kafka Connect REST API (i.e., the Kafka Connect URL from the Aiven Console) with a call like this:

$ curl -H "Content-type:application/json" -X POST -d '{
 "name": "test_connector",
 "config": {
   "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
   "database.hostname": "",
   "database.port": "22737",
   "database.user": "avnadmin",
   "database.password": "nqj26a2lni8pi2ax",
   "database.dbname": "defaultdb",
   "": "debezium-pg",
   "table.whitelist": "public.source_table",
   "": "wal2json",
   "database.sslmode": "require"

Note 1: Debezium requires the table schema in table.whitelist . I.e. instead of table_name you must always have schema_name.table_name or else the whitelist will not match any tables and nothing happens.

Note 2: Debezium only updates the PostgreSQL replication slot LSN positions when some changes take place in the database it is connected to. PostgreSQL is unable to delete old WAL segments if there are any replication slots that have not acknowledged receiving them. So if your system is completely idle (in which case Aiven PostgreSQL still generates 16 MiB of WAL every 5 minutes) or changes only occur in databases Debezium is not connected to, PostgreSQL will not be able to clean up WAL and the service will eventually run out of disk space. Thus it is essential to ensure any database you connect to with Debezium is updated frequently enough.

Note 3: If you are getting an error Caused by: org.postgresql.util.PSQLException: ERROR: must be superuser to create FOR ALL TABLES publication on the connector task when using the pgoutput plugin (instead of wal2json like the example above), this is because Debezium tries to create a publication and fails because avnadmin is not a superuser. We have to create the publication on the target database before configuring the connector:

Install the aiven-extras extension:


Create a publication for all tables:

SELECT * FROM aiven_extras.pg_create_publication_for_all_tables('dbz_publication', 'INSERT,UPDATE,DELETE');

Thank you to Alex Woods for creating the gist from which this problem and the solution were taken.

Then, restart the connector task and the connector should begin watching for changes to the target database.

Connector properties

WAL2JSON limitations

Note that the wal2json logical decoding plugin has limitations in the datatypes that it can support. 

Currently (as of 12.4.2018) it supports the following PostgreSQL datatypes explicitly:

Besides these it automatically turns all other datatypes into strings based on their textual representation. Basically this means that you need to be wary of changed meaning if you're using other PostgreSQL datatypes.

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?