JDBC source connector is useful to push data from a relational database such as PostgreSQL to Kafka. Data in Kafka can be consumed, transformed and consumed any number of times in interesting ways. This help article illustrates steps to setup JDBC source connector with PostgreSQL database.

JDBC source connector setup with Aiven for PostgreSQL

Here we illustrate steps required to setup JDBC source connector with Aiven for PostgreSQL service. Before getting started with the steps, the following information regarding the Kafka and PostgreSQL services need to be collected.

Following Aiven for Kafka service details are required (from the Aiven console):

KAFKA_HOST
KAFKA_PORT
KAFKA_CONNECT_SERVICE_URI

Following PostgreSQL service details are required (from the Aiven console):

PG_SERVICE_URI
PG_HOST
PG_PORT
PG_USER
PG_PW
PG_DEFAULT_DB

The JDBC URL typically would look like the following:
JDBC_URL=jdbc:postgresql://PG_HOST:PG_PORT/PG_DEFAULT_DB?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory

Create the schema in PostgreSQL

Log into PostgreSQL using psql and create the schema as follows (and insert a row of data):

> psql PG_SERVICE_URI

defaultdb=> create table accounts (id int primary key not null, name varchar(32));

defaultdb=> \d accounts

defaultdb=> insert into accounts (id,name) values(1,'foo1');

defaultdb=> select * from accounts;

Create the schema in PostgreSQL

Create a json file, jdbc-source-pg-increment.json, with the following contents:

{ 
   "name":"jdbc_source_pg_increment",
   "config":{
      "connector.class":"io.aiven.connect.jdbc.JdbcSourceConnector",
      "connection.url":"JDBC_URL",
      "connection.user":"PG_USER",
      "connection.password":"PG_PW",
      "table.whitelist":"public.accounts",
      "mode":"incrementing",
      "incrementing.column.name":"id",
      "topic.prefix":"jdbc_source_pg_increment.",
      "tasks.max":"1",
      "poll.interval.ms":"5000",
      "timestamp.delay.interval.ms":"1000",
      "batch.max.rows":"1",
      "validate.non.null":false
   }
}

Create the JDBC source connector

> curl -s -H "Content-Type: application/json" -X POST -d @jdbc-source-pg-increment.json KAFKA_CONNECT_SERVICE_URI/connectors/

NOTE: Make sure the connector is in RUNNING state and Tasks shows [1/1].

Run the console consumer in a separate terminal

The console consumer will read from the topic to which JDBC source connector is publishing data to and echo it to the console/terminal.

The following console-consumer.properties file needs to be created.

security.protocol=SSL
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=grp1

We are using the console consumer that ships with the opens source distribution of Apache Kafka from Apache Software Foundation.

/path/kafka-console-consumer.sh --bootstrap-server KAFKA_HOST:KAFKA_PORT --topic jdbc_source_pg_increment.accounts --consumer.config ./console-consumer.properties --from-beginning

Insert additional records into the accounts table (as shown below from psql) and see those records echoed in the terminal running the console consumer (above).

defaultdb=> insert into accounts (id,name) values(2,'foo2'); 
defaultdb=> insert into accounts (id,name) values(3,'foo3');
defaultdb=> insert into accounts (id,name) values(4,'foo4');

Did this answer your question?