This article is intended for developers, operations engineers, and solution architects. It describes how to use the infrastructure as code (IAC) that is included in the debezium-pg-kafka-connect-test GitHub project to deploy and configure the following:

  • Kafka service

  • Kafka Connector service

  • PostgreSQL service

For more information on configuring Debezium with Aiven for PostgreSQL, see the separate Debezium setup article.

Requirements:

Optional tools:

To deploy the infrastructure and processes:

  1. Update terraform/.auto.tfvars with the variable values that you want to use.

  2. Run the following Terraform wrapper script to deploy and configure the required resources:

    ./bin/deploy-terraform-infra.sh

  3. In one terminal, start the script that monitors replication slots:

    python bin/python_scripts/replication_slot_monitor.py --verbose --sleep 10

    Note: In the event of a database failover, the script continues to retry the connection:

    ERROR replication_slot_monitor: Failed connecting to the PG server. Retrying after 5 seconds...

  4. Verify that the Debezium connector is capturing changes.

    There is a helper Python script that generates data to a test PostgreSQL table and verifies that the changes are captured by Debezium. To verify this, the script consumes data from the Kafka topic to which the connector writes the change data capture (CDC) events and then checks that these records match the IDs of the records that were inserted into the source database table.

    The test database table is called test and it is hardcoded in bin/config-debezium-pg-kafka.sh and bin/python_scripts/debezium_pg_producer.py.

    Separate Python threads consume from the Kafka topic and insert data into the test table.

  5. In another terminal, start the PostgreSQL producer script:

    python bin/python_scripts/debezium_pg_producer.py --verbose --sleep 3


    Note: If the script cannot connect to the database in the event of a database failover, you will see the following log entry:

    ERROR debezium_pg_producer: Postgres data insert: Failed connecting to the PG server. Retrying after 5 seconds...

    The thread should resume after 10-15 seconds. However, the Kafka consumer thread might fail in getting new records if the Debezium connector does not resume. This appears in the log as the following message:

    INFO debezium_pg_producer: Kafka Consumer exited


    The PostgreSQL producer script accepts the following optional arguments:

    • --table TABLE: The table where test data is written. The default value is test.

    • --sleep SLEEP: The delay in seconds between inserts, which is used to control the flow of data. The default value is 1.

    • --iterations ITERATIONS: The number of inserts made before closing the application. The default value is 10000.

    • --verbose: Sets the logging level to DEBUG. The default logging level is WARN.

  6. To end the script, press Ctrl + C once.
    This adds the following message to the log to indicate if Debezium successfully captured all changes that were made since you started the script or if some records were missed:

    • Kafka Consumer: All inserts were captured by debezium

    • Kafka Consumer: Some inserts were not captured by debezium

Note: One issue that appeared during testing is that the replication slot can be inactive for roughly 15-20 minutes, which means that the connector is not capturing any changes. However, the connector was running fine, but only failed when a PostgreSQL database failover occurred with the following error message:

ERROR Producer failure org.postgresql.util.PSQLException: Database connection failed when writing to copy or ERROR Could not execute heartbeat action

After this, the connector's status showed as running, and no more error messages were triggered, but the replication slot that it consumes was still inactive. This error appeared to occur randomly during failover events and could not be reliably reproduced.

Changing the logging level for the Debezium connector

  • To get a more detailed log output, change the logging level of the running Debezium connector to TRACE using the following command:

    ./bin/set_debezium_connector_logging_level.sh io.debezium.connector.postgresql trace
  • To get the current logging levels of all loggers in the Kafka Connect cluster, run the same script with no arguments:

    ./bin/set_debezium_connector_logging_level.sh

Manual SQL queries to show the PostgreSQL replication slots and their status

In addition to the Python test scripts, here are some SQL examples:

  • Show t (true) under the active column for each slot:

    SELECT * from pg_replication_slots;
  • Show or monitor the amount of lag for the slots:

    SELECT redo_lsn, slot_name,restart_lsn, round((redo_lsn-restart_lsn) / 1024 / 1024 / 1024, 2) AS GB_behind FROM pg_control_checkpoint(), pg_replication_slots;

Destroy all deployed Terraform resources

Run the following command to clean up or destroy all Terraform infrastructure that is deployed with the wrapper script:

./bin/DESTROY-terraform-infra.sh

Did this answer your question?