JDBC source connector is useful to push data from a relational database such as PostgreSQL to Kafka. Data in Kafka can be consumed, transformed and consumed any number of times in interesting ways. This help article illustrates steps to setup JDBC source connector with PostgreSQL database.
JDBC source connector setup with Aiven for PostgreSQL
Here we illustrate steps required to setup JDBC source connector with Aiven for PostgreSQL service. Before getting started with the steps, the following information regarding the Kafka and PostgreSQL services need to be collected.
Following Aiven for Kafka service details are required (from the Aiven console):
KAFKA_HOST
KAFKA_PORT
KAFKA_CONNECT_SERVICE_URI
Following PostgreSQL service details are required (from the Aiven console):
PG_SERVICE_URI
PG_HOST
PG_PORT
PG_USER
PG_PW
PG_DEFAULT_DB
The JDBC URL typically would look like the following:
JDBC_URL=jdbc:postgresql://PG_HOST:PG_PORT/PG_DEFAULT_DB?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory
Create the schema in PostgreSQL
Log into PostgreSQL using psql and create the schema as follows (and insert a row of data):
> psql PG_SERVICE_URI
defaultdb=> create table accounts (id int primary key not null, name varchar(32));
defaultdb=> \d accounts
defaultdb=> insert into accounts (id,name) values(1,'foo1');
defaultdb=> select * from accounts;
Create the schema in PostgreSQL
Create a json file, jdbc-source-pg-increment.json, with the following contents:
{
"name":"jdbc_source_pg_increment",
"config":{
"connector.class":"io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url":"JDBC_URL",
"connection.user":"PG_USER",
"connection.password":"PG_PW",
"table.whitelist":"public.accounts",
"mode":"incrementing",
"incrementing.column.name":"id",
"topic.prefix":"jdbc_source_pg_increment.",
"tasks.max":"1",
"poll.interval.ms":"5000",
"timestamp.delay.interval.ms":"1000",
"batch.max.rows":"1",
"validate.non.null":false
}
}
Create the JDBC source connector
> curl -s -H "Content-Type: application/json" -X POST -d @jdbc-source-pg-increment.json KAFKA_CONNECT_SERVICE_URI/connectors/
NOTE: Make sure the connector is in RUNNING state and Tasks shows [1/1].
Run the console consumer in a separate terminal
The console consumer will read from the topic to which JDBC source connector is publishing data to and echo it to the console/terminal.
The following console-consumer.properties file needs to be created.
security.protocol=SSL
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=grp1
We are using the console consumer that ships with the opens source distribution of Apache Kafka from Apache Software Foundation.
/path/kafka-console-consumer.sh --bootstrap-server KAFKA_HOST:KAFKA_PORT --topic jdbc_source_pg_increment.accounts --consumer.config ./console-consumer.properties --from-beginning
Insert additional records into the accounts table (as shown below from psql) and see those records echoed in the terminal running the console consumer (above).
defaultdb=> insert into accounts (id,name) values(2,'foo2');
defaultdb=> insert into accounts (id,name) values(3,'foo3');
defaultdb=> insert into accounts (id,name) values(4,'foo4');
Got here by accident? Learn how Aiven simplifies working with Apache Kafka: