This help article will show how to setup Elasticsearch sink connector, a Kafka connector, for Aiven for Kafka along with Schema Registry. The article is divided into the following parts:
- Creating an Aiven for Kafka service
- Creating an Aiven Elasticsearch service
- Writing a Kafka producer that uses Schema Registry for schema evolution
- Configuring Elasticsearch sink connector with Schema Registry integration to consume and store the messages in Elasticsearch
Please refer to the help article on writing a Kafka producer that uses Schema Registry for schema evolution. The same producer will be used in this help article to produce the messages to be consumed by the Elasticsearch sink connector.
Configuring Elasticsearch sink connector with Schema Registry
The following information will be required in the configuration process:
- Kafka service URL from the Kafka service page
- Kafka connect URL from the Kafka service page
- Schema Registry URL (URL without the username and password), username and password from the Kafka service page
- Elasticsearch host, port, username and password from the Elasticsearch service page
Configuring Elasticsearch sink connector
The configuration that will be used for Elasticsearch sink connector is shown below.
(Note: Please substitute appropriate values for Kafka service URL, Schema registry URL, Kafka connect URL, avndmin's password (for schema registry, and Kafka connect) and key and truststore files' location on your computer).
$curl -X POST https://avnadmin:firstname.lastname@example.org:17293/connectors \
-H "Content-type:application/json" \
-d @- << EOF
The above command is sent to the Kafka connect component of Kafka to configure the Elasticsearch sink connector. It assumes that Kafka connect has coordinates of “kafka-20-biz4-a-exercise1.aivencloud.com:17293”. The Elasticsearch sink connector receives all messages sent to topic, “topic1”. The “connection.*” parameters specify coordinates of the Elasticsearch service which is the recipient of the messages. The “value.converter.*” parameters indicate coordinates and credentials of Kafka schema registry which is used to decipher the schema of the messages. In this example, only the value, out of the key-value pair of the messages, requires explicit decoding based on a predefined schema. The default String converter is used to decode the key of the messages.
Checking the status of Kafka connect
Some useful commands to check the status of Kafka connect for Elasticsearch search sink connector are shown below.
Checking Elasticsearch for stored messages
The following commands examine Elasticsearch for messages and indices stored in it.
We hope this help article has shown how to configure Elasticsearch sink connector with Aiven Kafka including Kafka connect and Schema registry, to send messages to Elasticsearch where it can be stored, indexed, and searched.
Frequently Asked Questions
How to make daily indexes?
By including the following snippet to a connector configuration, you can split the indexes further by the date the message was produced.