JDBC source connector is useful to push data from a relational database such as MySQL to Kafka. Data in Kafka can be consumed, transformed and consumed any number of times in interesting ways. This help article illustrates steps to setup JDBC source connector with MySQL database.

JDBC source connector setup with Aiven for MySQL

Here we illustrate steps required to setup JDBC source connector with Aiven for MySQL service. Before geting started with the steps, the following information regarding the Kafka and MySQL services need to be collected.

Following Aiven for Kafka service details are required (from the Aiven console):

KAFKA_HOST
KAFKA_PORT
KAFKA_CONNECT_SERVICE_URI

Following MySQL service details are required (from the Aiven console):

MYSQL_SERVICE_URI
MYSQL_HOST
MYSQL_PORT
MYSQL_USER
MYSQL_PW
MYSQL_DEFAULT_DB

The JDBC URL typically would look like the following:
JDBC_URL=jdbc:mysql://MYSQL_HOST:MYSQL_PORT/MYSQL_DEFAULT_DB?&verifyServerCertificate=false&useSSL=true&requireSSL=true

Create the schema in MySQL

Log into MySQL using mysqlsh and create the schema as follows (and insert a row of data):

> mysqlsh MYSQL_SERVICE_URI
JS> \sql
SQL> create table accounts (id int primary key not null, name varchar(32));
SQL> describe accounts;
SQL> insert into accounts(id,name) values(1,'foo1');
SQL> select * from accounts;

Create the schema in MySQL

Create a json file, jdbc-source-mysql-increment.json, with the following contents:

{
   "name":"jdbc_source_mysql_increment",
   "config":{
      "tasks.max":"1",
      "poll.interval.ms":5000,
    "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url":"JDBC_URL",
      "connection.user":"MYSQL_USER",
      "connection.password":"MYSQL_PW",
      "key.converter.schemas.enable":"false",
      "value.converter.schemas.enable":"false",
      "topic.prefix":"jdbc_source_mysql_increment.",
      "table.whitelist":"defaultdb.accounts",
      "mode":"incrementing",
      "incrementing.column.name":"id",
      "timestamp.delay.interval.ms":"1000",
      "batch.max.rows":"1",
      "validate.non.null":false
   }
}

Create the JDBC source connector

> curl -s -H "Content-Type: application/json" -X POST -d @jdbc-source-mysql-increment.json KAFKA_CONNECT_SERVICE_URI/connectors/

Run the console consumer in a separate terminal

The console consumer will read from the topic to which JDBC source connector is publishing data to and echo it to the console/terminal.

The following console-consumer.properties file needs to be created.

security.protocol=SSL
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=grp1

We are using the console consumer that ships with the opens source distribution of Apache Kafka from Apache Software Foundation.

/path/kafka-console-consumer.sh --bootstrap-server KAFKA_HOST:KAFKA_PORT --topic jdbc_source_mysql_increment.accounts --consumer.config ./console-consumer.properties --from-beginning

Insert additional records into the accounts table (as shown below from psql) and see those records echoed in the terminal running the console consumer (above).

defaultdb=> insert into accounts (id,name) values(2,'foo2'); 
defaultdb=> insert into accounts (id,name) values(3,'foo3');
defaultdb=> insert into accounts (id,name) values(4,'foo4');
Did this answer your question?