This article walks through the steps required to successfully setup a Cassandra sink connector for Kafka and have it consume data from a Kafka topic and subsequently store it in Cassandra. This example also uses Kafka Schema Registry to produce and consume data adhering to Avro schemas.

Note: As of version 3.0, Aiven for Apache Kafka no longer supports Confluent Schema Registry. For more information, see this article that describes the replacement, Karapace.

Before the connector is setup, a number of details regarding both the Kafka service and the Cassandra service are required. 

Following Cassandra service details are required (from Aiven console):


Following Kafka service details are required (from Aiven console):


Steps to setup Cassandra sink connector with Aiven for Apache Kafka

Kafka service

1. Create Kafka service (minimum Business-8 plan) in the cloud and region of your choice
2. Enable Kafka connect and Schema Registry sub-services for the Kafka service
3. Collect Kafka and Schema registry registry details that are required
4. Create topic, orders-topic, in Kafka

Cassandra Service

1. Create Cassandra service in the cloud and region of your choice
2. Use a Cassandra client of your choice to create Keyspace and Table

CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
use demo;
create table orders (id int, created varchar, product varchar, qty int, price float, PRIMARY KEY (id, created)) WITH CLUSTERING ORDER BY (created asc);

3. Collect Cassandra service details that are required

Setting up the Cassandra sink connector for Kafka

Use the following command from the unix command line to setup the connector.

curl -X POST kafka_connect_service_uri/connectors \
-H "Content-type:application/json" \
-d @- << EOF
  "name": "test_cass_sink-1",
    "connect.cassandra.port": "cassandra_port",
    "": "demo",
    "connect.cassandra.kcql": "INSERT INTO orders SELECT * from orders-topic",
    "connect.cassandra.ssl.enabled": "true",
    "": "/run/aiven/keys/public.truststore.jks",
    "": "password",

Check to verify the connector is in "Running" state and at least one task, "1/1", is running in the Kafka connect tab for the Kafka service. If not, check for exceptions in both the Kafka connect tab and in the Logs tab for the Kafka service for potential errors.

Once the connect tasks are running, publish messages to the orders-topic topic using kafka-avro-console-producer (this can be obtained from Confluent). Sample file contents:


The truststore and keystore can be created as follows (also described in the getting started guide for Kafka)

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

./kafka-avro-console-producer --broker-list kafka_host:kafka_port --topic orders-topic  --producer.config /path/ --property schema.registry.url=https://kafka_host:schema_reg_port --property basic.auth.credentials.source=USER_INFO --property --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99.5, "qty":100}

Check Cassandra database for the inserted records.

avnadmin@cqlsh:demo> select * from orders;

 id | created             | price | product                | qty
  1 | 2016-05-06 13:53:00 |  94.2 | OP-DAX-P-20150201-95.7 | 100
  2 | 2016-05-06 13:54:00 |  99.5 |  OP-DAX-C-20150201-100 | 100

(2 rows)

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?