This help article will show how to setup Elasticsearch sink connector, a Kafka connector, for Aiven Kafka along with Schema Registry. The article is divided into the following parts:

  • Creating an Aiven Kafka service
  • Creating an Aiven Elasticsearch service
  • Writing a Kafka producer that uses Schema Registry for schema evolution
  • Configuring Elasticsearch sink connector with Schema Registry integration to consume and store the messages in Elasticsearch

Creating an Aiven Kafka service is documented in the “Getting started with Aiven Kafka” help article. After creating the Kafka service, please create a topic called “topic1” from the Aiven console.

Creating an Aiven Elasticsearch service is documented in the “Getting started with Aiven Elasticsearch” help article.

Please refer to the help article on writing a Kafka producer that uses Schema Registry for schema evolution. The same producer will be used in this help article to produce the messages to be consumed by the Elasticsearch sink connector.

Configuring Elasticsearch sink connector with Schema Registry

The following information will be required in the configuration process:

  • Kafka service URL from the Kafka service page
  • Kafka connect URL from the Kafka service page
  • Schema Registry URL (URL without the username and password), username and password from the Kafka service page
  • Elasticsearch host, port, username and password from the Elasticsearch service page

Configuring Elasticsearch sink connector

The configuration that will be used for Elasticsearch sink connector is shown below.
(Note: Please substitute appropriate values for Kafka service URL, Schema registry URL, Kafka connect URL, avndmin's password (for schema registry, and Kafka connect) and key and truststore files' location on your computer).

$curl -X POST https://avnadmin:password@kafka-20-biz4-a-exercise1.aivencloud.com:17293/connectors \
-H "Content-type:application/json" \
-d @- << EOF
{
  "name": "test_es_sink-10",
  "config":
  {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max":"1",
  "topics":"topic1",
  "key.ignore":"true",

  "connection.url":"https://es-test-exercise1.aivencloud.com:17293",
  "connection.username":"avnadmin",
  "Connection.password":"elasticsearch-password",

  "value.converter":"io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url":"https://kafka-20-biz4-a-exercise1.aivencloud.com:17298",
  "value.converter.basic.auth.credentials.source":"USER_INFO",
  "value.converter.basic.auth.user.info":"avnadmin:schema-reg-password",

  "Type.name":"kafka-connect",
  "name":"test_es_sink-10"
  }
}
EOF

The above command is sent to the Kafka connect component of Kafka to configure the Elasticsearch sink connector. It assumes that Kafka connect has coordinates of “kafka-20-biz4-a-exercise1.aivencloud.com:17293”. The Elasticsearch sink connector receives all messages sent to topic, “topic1”. The “connection.*” parameters specify coordinates of the Elasticsearch service which is the recipient of the messages. The “value.converter.*” parameters indicate coordinates and credentials of Kafka schema registry which is used to decipher the schema of the messages. In this example, only the value, out of the key-value pair of the messages, requires explicit decoding based on a predefined schema. The default String converter is used to decode the key of the messages.

Checking the status of Kafka connect

Some useful commands to check the status of Kafka connect for Elasticsearch search sink connector are shown below.

Checking Elasticsearch for stored messages

The following commands examine Elasticsearch for messages and indices stored in it.

We hope this help article has shown how to configure Elasticsearch sink connector with Aiven Kafka including Kafka connect and Schema registry, to send messages to Elasticsearch where it can be stored, indexed, and searched.

Did this answer your question?