This help article will illustrate how to setup and use the Debezium Kafka (connect) connector to listen for changes in the PostgreSQL database and subsequently write those changes to a topic in Kafka (by Aiven). In this scenario, Debezium acts as a source connector.

First off, some information regarding RDS PostgreSQL service and the destination Aiven Kafka needs to be collected.

Following RDS PostgreSQL service details are required (from AWS console):

RDS_PG_SERVICE_URL
RDS_PG_HOST
RDS_PG_PORT
RDS_PG_USER
RDS_PG_PW
RDS_PG_DEFAULT_DB

Following Aiven Kafka service details are required (from Aiven console):

KAFKA_HOST
KAFKA_PORT
KAFKA_CONNECT_SERVICE_URI

Steps to prepare Aiven Kafka service for Debezium

In order for Debezium to be able to write to Aiven Kafka topic, Aiven Kafka service needs to be setup in the following manner via the Aiven console:

  1. Enable "kafka.auto_create_topics_enable". This parameter is available in the "Advanced configuration" area.

Steps to prepare RDS PostgreSQL service Debezium

In order for Debezium to be able to listen for changes in the RDS PostgreSQL database service, RDS PostgreSQL needs to be setup in the following manner via the AWS console:

  1. Create new parameter group
  2. Assign a value of "1" to the parameter "rds.logical_replication"
  3. Modify the running instance apply the newly created parameter group
  4. Reboot the RDS instance

Once the service is up, check to verify logical replication is enabled. Then create the schema.

> psql postgres://RDS_PG_USER:RDS_PG_PW@RDS_PG_HOST:RDS_PG_PORT/RDS_PG_DEFAULT_DB?sslmode=require

defaultdb=> show wal_level;
 wal_level
-----------
 logical
(1 row)

defaultdb=> show max_replication_slots;
 max_replication_slots
-----------------------
 10

defaultdb=> create table public.customers (id int primary key, name varchar(64));

Modify the Security group of the RDS instance to accept connections from the Aiven Kafka brokers (all N brokers' IP addresses)

Setting up the Debezium (PostgreSQL) source connector for Kafka
Setup the Debezium connector by issuing a curl command as follows (with appropriate substitutions for various variables):

> curl -X POST KAFKA_CONNECT_SERVICE_URI/connectors \
-H "Content-type:application/json" \
-d @- << EOF
{
  "name": "demo-rdspg-source",
  "config": {
   "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
   "database.hostname": "RDS_PG_HOST",
   "database.port": "RDS_PG_PORT",
   "database.user": "RDS_PG_USER",
   "database.password": "RDS_PG_PW",
   "database.dbname": "RDS_PG_DEFAULT_DB",
   "database.server.name": "debezium_rdspg",
   "table.whitelist": "public.customers",
   "plugin.name": "wal2json"
  }
}
EOF

Insert some records into the RDS PostgreSQL database table

defaultdb=> insert into public.customers(id,name) values(1,'foo');
INSERT 0 1
defaultdb=> insert into public.customers(id,name) values(2,'bar');
INSERT 0 1

Check for data in Kafka topic
Use kafka_console_consumer (a tool that is distributed with Apache Kafka) to consume message from the Kafka topic.

> ./kafka-console-consumer --bootstrap-server KAFKA_HOST:KAFKA_PORT --topic debezium_rdspg.public.customers --consumer.config /path/console-consumer.properties --from-beginning

{"before":null,"after":{"id":1,"name":"foo"},"source":{"version":"0.9.2.Final","connector":"postgresql","name":"debezium_aivenpg","db":"defaultdb","ts_usec":1553713060036055000,"txId":1521,"lsn":603980272,"schema":"public","table":"customers","snapshot":false,"last_snapshot_record":null},"op":"c","ts_ms":1553713060236}
{"before":null,"after":{"id":2,"name":"foo"},"source":{"version":"0.9.2.Final","connector":"postgresql","name":"debezium_aivenpg","db":"defaultdb","ts_usec":1553713088621512000,"txId":1525,"lsn":603980744,"schema":"public","table":"customers","snapshot":false,"last_snapshot_record":null},"op":"c","ts_ms":1553713088646}

Here, console-consumer.properties would have the following properties:

security.protocol=SSL
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=grp1

The truststore and keystore can be created as follows (also described in the getting started guide for Kafka)

> openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
> keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

Did this answer your question?