This article walks through the steps required to successfully setup a Cassandra sink connector for Kafka and have it consume data from a Kafka topic and subsequently store it in Cassandra. This example also uses Kafka Schema Registry to produce and consume data adhering to Avro schemas.

Before the connector is setup, a number of details regarding both the Kafka service and the Cassandra service are required. 

Following Cassandra service details are required (from Aiven console):

cassandra_host
cassandra_port
cassandra_user
cassandra_pw

Following Kafka service details are required (from Aiven console):

kafka_host
kafka_port
schema_reg_port
schema_reg_pw
kafka_connect_service_uri

Steps to setup Cassandra sink connector with Aiven for Kafka

Kafka service

1. Create Kafka service (minimum Business-8 plan) in the cloud and region of your choice
2. Enable Kafka connect and Schema Registry sub-services for the Kafka service
3. Collect Kafka and Schema registry registry details that are required
4. Create topic, orders-topic, in Kafka

Cassandra Service

1. Create Cassandra service in the cloud and region of your choice
2. Use a Cassandra client of your choice to create Keyspace and Table

CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
use demo;
create table orders (id int, created varchar, product varchar, qty int, price float, PRIMARY KEY (id, created)) WITH CLUSTERING ORDER BY (created asc);

3. Collect Cassandra service details that are required


Setting up the Cassandra sink connector for Kafka

Use the following command from the unix command line to setup the connector.

curl -X POST kafka_connect_service_uri/connectors \
-H "Content-type:application/json" \
-d @- << EOF
{
  "name": "test_cass_sink-1",
  "config":
  {
    "connector.class":"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
    "tasks.max":"1",
    "topics":"orders-topic",
    "key.converter":"io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url":"https://kafka_host:schema_reg_port",
    "key.converter.basic.auth.credentials.source":"USER_INFO",
    "key.converter.basic.auth.user.info":"avnadmin:schema_reg_pw",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"https://kafka_host:schema_reg_port",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"avnadmin:schema_reg_pw",
    "connect.cassandra.contact.points":"cassandra_host",
    "connect.cassandra.port": "cassandra_port",
    "connect.cassandra.key.space": "demo",
    "connect.cassandra.username":"cassandra_user",
    "connect.cassandra.password":"cassandra_pw",
    "connect.cassandra.kcql": "INSERT INTO orders SELECT * from orders-topic",
    "connect.cassandra.ssl.enabled": "true",
    "connect.cassandra.trust.store.path": "/run/aiven/keys/public.truststore.jks",
    "connect.cassandra.trust.store.password": "password",
    "type.name":"kafka-connect",
    "name":"test_cass_sink-1"
  }  
}
EOF

Check to verify the connector is in "Running" state and at least one task, "1/1", is running in the Kafka connect tab for the Kafka service. If not, check for exceptions in both the Kafka connect tab and in the Logs tab for the Kafka service for potential errors.

Once the connect tasks are running, publish messages to the orders-topic topic using kafka-avro-console-producer (this can be obtained from Confluent). Sample avro-producer.properties file contents:

bootstrap.servers=kafka_host:kafka_port
security.protocol=SSL
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer


The truststore and keystore can be created as follows (also described in the getting started guide for Kafka)

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

./kafka-avro-console-producer --broker-list kafka_host:kafka_port --topic orders-topic  --producer.config /path/avro-producer.properties --property schema.registry.url=https://kafka_host:schema_reg_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:schema_reg_pw --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99.5, "qty":100}


Check Cassandra database for the inserted records.

avnadmin@cqlsh:demo> select * from orders;

 id | created             | price | product                | qty
----+---------------------+-------+------------------------+-----
  1 | 2016-05-06 13:53:00 |  94.2 | OP-DAX-P-20150201-95.7 | 100
  2 | 2016-05-06 13:54:00 |  99.5 |  OP-DAX-C-20150201-100 | 100

(2 rows)



Did this answer your question?