This article is intended for developers, operations engineers, and solution architects. It describes how to use the infrastructure as code (IAC) that is included in the debezium-pg-kafka-connect-test GitHub project to deploy and configure the following:
Kafka Connector service
For more information on configuring Debezium with Aiven for PostgreSQL, see the separate Debezium setup article.
Terraform version 14.x or newer installed
avn_api_tokenvariable in the
/terraform/.secrets.tfvarsfile with your Aiven authentication token. You can rename the sample file (
cd ./terraform && mv .secrets.tfvars.example .secrets.tfvars) and update it with your token.
To deploy the infrastructure and processes:
terraform/.auto.tfvarswith the variable values that you want to use.
Run the following Terraform wrapper script to deploy and configure the required resources:
In one terminal, start the script that monitors replication slots:
python bin/python_scripts/replication_slot_monitor.py --verbose --sleep 10
Note: In the event of a database failover, the script continues to retry the connection:
ERROR replication_slot_monitor: Failed connecting to the PG server. Retrying after 5 seconds...
Verify that the Debezium connector is capturing changes.
There is a helper Python script that generates data to a test PostgreSQL table and verifies that the changes are captured by Debezium. To verify this, the script consumes data from the Kafka topic to which the connector writes the change data capture (CDC) events and then checks that these records match the IDs of the records that were inserted into the source database table.
The test database table is called test and it is hardcoded in
Separate Python threads consume from the Kafka topic and insert data into the test table.
In another terminal, start the PostgreSQL producer script:
python bin/python_scripts/debezium_pg_producer.py --verbose --sleep 3
Note: If the script cannot connect to the database in the event of a database failover, you will see the following log entry:
ERROR debezium_pg_producer: Postgres data insert: Failed connecting to the PG server. Retrying after 5 seconds...
The thread should resume after 10-15 seconds. However, the Kafka consumer thread might fail in getting new records if the Debezium connector does not resume. This appears in the log as the following message:
INFO debezium_pg_producer: Kafka Consumer exited
The PostgreSQL producer script accepts the following optional arguments:
--table TABLE: The table where test data is written. The default value is
--sleep SLEEP: The delay in seconds between inserts, which is used to control the flow of data. The default value is
--iterations ITERATIONS: The number of inserts made before closing the application. The default value is
--verbose: Sets the logging level to
DEBUG. The default logging level is
To end the script, press Ctrl + C once.
This adds the following message to the log to indicate if Debezium successfully captured all changes that were made since you started the script or if some records were missed:
Kafka Consumer: All inserts were captured by debezium
Kafka Consumer: Some inserts were not captured by debezium
Note: One issue that appeared during testing is that the replication slot can be inactive for roughly 15-20 minutes, which means that the connector is not capturing any changes. However, the connector was running fine, but only failed when a PostgreSQL database failover occurred with the following error message:
ERROR Producer failure org.postgresql.util.PSQLException: Database connection failed when writing to copy or ERROR Could not execute heartbeat action
After this, the connector's status showed as running, and no more error messages were triggered, but the replication slot that it consumes was still inactive. This error appeared to occur randomly during failover events and could not be reliably reproduced.
Changing the logging level for the Debezium connector
To get a more detailed log output, change the logging level of the running Debezium connector to
TRACEusing the following command:
./bin/set_debezium_connector_logging_level.sh io.debezium.connector.postgresql trace
To get the current logging levels of all loggers in the Kafka Connect cluster, run the same script with no arguments:
Manual SQL queries to show the PostgreSQL replication slots and their status
In addition to the Python test scripts, here are some SQL examples:
t(true) under the
activecolumn for each slot:
SELECT * from pg_replication_slots;
Show or monitor the amount of lag for the slots:
SELECT redo_lsn, slot_name,restart_lsn, round((redo_lsn-restart_lsn) / 1024 / 1024 / 1024, 2) AS GB_behind FROM pg_control_checkpoint(), pg_replication_slots;
Destroy all deployed Terraform resources
Run the following command to clean up or destroy all Terraform infrastructure that is deployed with the wrapper script: