0 Scenario

You have a Kafka cluster that you are either self managing or using a managed service that you are unhappy with, and you want to migrate it to managed Kafka service by Aiven. If this is generally what you are looking to do, then continue reading.

0.1 Source Kafka cluster

We are going to assume that the data from the source Kafka cluster is what you wish to migrate to the destination Kafka cluster by Aiven. We are also going to assume that the source Kafka cluster is used with Avro schemas utilizing the Schema registry, and the Amazon S3 sink Kafka connector. All events sourced to Kafka eventually make S3 as their final resting place for long term storage. We also assume that the producers and consumers to the source Kafka cluster do so over TLS encrypted channels, and further use secure HTTPS based basic authentication for interfacing with Schema registry, and Kafka connect.

0.2 Destination Kafka cluster

The destination is an Aiven for Kafka cluster with Avro schemas utilizing Schema registry with secure basic authentication over HTTPS, Amazon S3 sink Kafka connector, and produce and consume events over TLS encrypted channels.

0.3 Kafka MirrorMaker

Kafka MirrorMaker (v1.0) will be used to asynchronously replicate topic data from the source to the destination Kafka cluster. Kafka MM accomplishes it tasks over TLS encrypted channels (both with the source and the destination Kafka clusters).

1 Setting up the source Kafka cluster

We are going to assume that Schema registry, Kafka connect with Confluent’s Amazon S3 sink connector are available and setup in the source cluster. Note: not all of these components are necessary, but are shown for completeness since these components are used in the majority of deployment scenarios. You will also require Confluent platform which you can download for free from Confluent hub. At the time of this writing CP 5.3.1 was generally available. Note that for the purposes of this writing we will be using a source Kafka cluster deployed on Aiven.

For the purposes of this demonstration, we will use the topic called topic2 with topic replication factor of two (2), three (3) partitions, retention of 7 days and cleanup policy of delete

1.1 Setting up consumer

We will use avro-console-consumer to subscribe to events from topic2 topic with the following command line:

> /path/confluent-5.3.1/bin/kafka-avro-console-consumer --bootstrap-server source_kafka.aivencloud.com:kafka_port --topic topic2  --consumer.config /path/mirrormaker/source/kafka-source/avro-consumer.properties --property schema.registry.url=https://source_kafka.aivencloud.com:sr_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:sr_pw --from-beginning

NOTE: “source_kafka”, kafka_port, sr_port, sr_pw, etc.can be obtained from the Aiven console

Consumer configuration properties

> cat source/kafka-source/avro-consumer.properties 
bootstrap.servers=source_kafka.aivencloud.com:kafka_port
security.protocol=SSL
ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/mirrormaker/source/kafka-source/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=grp1
client.id=client1
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
enable.auto.commit=true

NOTE: “source_kafka” can be obtained from the Aiven console. Please refer to getting started guide for instructions on how to generate Java truststore and keystore

1.2 Setting up S3 sink connector

We will assume that the S3 or any other connector you wish to use is available in your Kafka setup. The following configuration will be used to setup the S3 sink connector.

> cat source/c_avro_s3_sink.json 
{
"name": "c_avro_s3_sink",
"config":
{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"s3.credentials.provider.class":"io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url":"https://source_kafka.aivencloud.com:sr_port",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"avnadmin:sr_pw",
"value.converter.schemas.enable": "false",
"tasks.max":"1",
"topics":"topic2",
"topics.dir": "json",
"s3.region":"your-aws-region-of-choice",
"s3.bucket.name":"your-S3-bucket-name",
"flush.size":"1",
"s3.credentials.provider.access_key_id":"XXX",
"s3.credentials.provider.secret_access_key":"XXX"
}
}

NOTE: “source_kafka”, kafka_port, sr_port, sr_pw, etc.can be obtained from the Aiven console
NOTE: please substitute values for AWS credentials, AWS region,  and S3 bucket

Create the S3 connector instance with associated tasks using the following command:

> curl -s -H "Content-Type: application/json" -X POST -d @./source/c_avro_s3_sink.json KAFKA_CONNECT_URI/connectors/

NOTE: KAFKA_CONNECT_URI can be obtained from the Aiven console

1.3 Setting up producer

Now it is time to start publishing messages to the topic that conforms to the avro schema. Use the following command to start publishing messages.

> /path/confluent-5.3.1/bin/kafka-avro-console-producer --broker-list source_kafka.aivencloud.com:kafka_port --topic topic2  --producer.config /path/mirrormaker/source/kafka-source/avro-producer.properties --property schema.registry.url=https://source_kafka.aivencloud.com:sr_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:sr-pw --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'
{"id": 1, "product": "foo", "quantity": 100, "price": 50}
{"id": 2, "product": "foo", "quantity": 100, "price": 50}
{"id": 3, "product": "foo", "quantity": 100, "price": 50}


Producer properties

> cat source/kafka-source/avro-producer.properties 
bootstrap.servers=source_kafka.aivencloud.com:kafka_port
security.protocol=SSL
ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/mirrormaker/source/kafka-source/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

Check the consumer for an output similar to the following (Note: the ordering can be different):

{"id":3,"product":"foo","quantity":100,"price":50.0}
{"id":1,"product":"foo","quantity":100,"price":50.0}
{"id":2,"product":"foo","quantity":100,"price":50.0}

2 Setting up destination Kafka cluster

For the purposes of this demonstration, we will use the topic called topic2 with topic replication factor of two (2), three (3) partitions, retention of 7 days and cleanup policy of delete. We will also enable Schema registry and Kafka connect on the destination cluster. It is very important to keep identical replication factor and partition count as the source cluster.

3 Setting up and running MirrorMaker

We will assume you have downloaded the Apache Kafka distribution from https://kafka.apache.org/. We will use the following MirrorMaker producer and consumer configuration properties.

MirrorMaker consumer properties

> cat source/mm_consumer.properties 
bootstrap.servers=souce_kafka.aivencloud.com:kafka_port
exclude.internal.topics=true
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
#partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
auto.offset.reset=earliest
#
security.protocol=SSL
ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/mirrormaker/source/kafka-source/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret

MirrorMaker producer properties

> cat destination/mm_producer.properties 
bootstrap.servers=destination_kafka.aivencloud.com:kafka_port
acks=all
batch.size=1
#partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
client.id=mirror_maker_producer
max.in.flight.requests.per.connection=1
retries=1000
#
security.protocol=SSL
ssl.truststore.location=/path/mirrormaker/destination/kafka-dest/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/mirrormaker/destination/kafka-dest/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret

Setting Java VM environment variables for MirrorMaker

The following environment setup can tuned depending on your needs.

> export KAFKA_HEAP_OPTS='-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'

Running MirrorMaker

> /path/kafka_2.12-2.3.1/bin/kafka-mirror-maker.sh --consumer.config /path/mirrormaker/source/mm_consumer.properties --producer.config /path/mirrormaker/destination/mm_producer.properties --num.streams 1 --whitelist=".*"

NOTE: num.streams and topics whitelist can be chosen depending on specific needs

Produce new messages to the topic

{"id": 4, "product": "foo", "quantity": 100, "price": 50}
{"id": 5, "product": "foo", "quantity": 100, "price": 50}
{"id": 6, "product": "foo", "quantity": 100, "price": 50}

Check the consumer for an output similar to the following (Note: the ordering can be different):

{"id":6,"product":"foo","quantity":100,"price":50.0}
{"id":4,"product":"foo","quantity":100,"price":50.0}
{"id":5,"product":"foo","quantity":100,"price":50.0}

4 Getting ready to switch to new Kafka cluster

Now that MirrorMaker has been setup and is replicating topic data to the new Kafka cluster, it is time to make preparations to move consumers, including S3 sink connector, and producers to the new cluster. Before doing so, make sure,

  • MirrorMaker has caught up with all the data in the topics of the source cluster. If this is not the case, it is time to quiesce the producers to give all the consumers - MirrorMaker consumer, your application consumers, and S3 sink connect (which is also a consumer) a chance to catch up the data in the topics. (See below for command to check MM lag).
  • Quiesce the producers to give MirrorMaker and the consumers sufficient time to consume all the messages in the topics

Checking MirrorMaker lag

> /path/kafka_2.12-2.3.1/bin/kafka-consumer-groups.sh --bootstrap-server source_kafka.aivencloud.com:kafka_port --command-config /path/mirrormaker/source/kafka-source/consumer.properties --group mirror_maker_group --describe 

NOTE: replace mirror_maker_group with appropriate mirror make consume group

Where consumer.properties is:

> cat /path/mirrormaker/source/consumer.properties
bootstrap.servers=source_kafka.aivencloud.com:kafka_port
security.protocol=SSL
ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=//path/mirrormaker/source/kafka-source/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=mm_chk
client.id=mm_chk_client1
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

4.1 Switching over consumer to new cluster

Switch over consumer

> /path/confluent-5.3.1/bin/kafka-avro-console-consumer --bootstrap-server destination_kafka.aivencloud.com:kafka_port --topic topic2  --consumer.config /path/mirrormaker/destination/kafka-dest/avro-consumer.properties --property schema.registry.url=https://destination_kafka.aivencloud.com:sr_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:sr_w 

Where avro consumer properties is:

> cat destination/kafka-dest/avro-consumer.properties 
bootstrap.servers=destination_kafka.aivencloud.com:kafka_port
security.protocol=SSL
ssl.truststore.location=/path/mirrormaker/destination/kafka-dest/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/pathm/mirrormaker/destination/kafka-dest/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=grp1
client.id=client1
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

4.2 Switching S3 connector to new cluster

Switch over S3 sink connector

> cat destination/c_avro_s3_sink.json 
{
"name": "c_avro_s3_sink",
"config":
{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"s3.credentials.provider.class":"io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"https://destination_kafka.aivencloud.com:sr_port",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"avnadmin:sr_pw",
"value.converter.schemas.enable": "false",
"tasks.max":"1",
"topics":"topic2",
"topics.dir": "json",
"s3.region":"your-aws-region-of-choice",
"s3.bucket.name":"your-S3-bucket-name",
"flush.size":"1",
"s3.credentials.provider.access_key_id":"XXX",
"s3.credentials.provider.secret_access_key":"XXX"
}
}

Create the S3 connector instance with associated tasks using the following command:

> curl -s -H "Content-Type: application/json" -X POST -d @./destination/c_avro_s3_sink.json KAFKA_CONNECT_URI/connectors/

NOTE: KAFKA_CONNECT_URI can be obtained from the Aiven console

4.2 Switching producer to new cluster

Switch producers to new cluster

> /path/confluent-5.3.1/bin/kafka-avro-console-producer --broker-list destination_kafka.aivencloud.com:kafka_port --topic topic2  --producer.config /path/mirrormaker/destination/kafka-dest/avro-producer.properties --property schema.registry.url=https://destination_kafka.aivencloud.com:sr_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:sr_pw --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'
{"id": 201, "product": "foo", "quantity": 100, "price": 50}
{"id": 202, "product": "foo", "quantity": 100, "price": 50}
{"id": 203, "product": "foo", "quantity": 100, "price": 50}

Producer properties

> cat destination/kafka-dest/avro-producer.properties 
bootstrap.servers=destination_kafka.aivencloud.com:kafka_port
security.protocol=SSL
ssl.truststore.location=/path/mirrormaker/destination/kafka-dest/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/mirrormaker/destination/kafka-dest/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

Check the consumer for an output similar to the following (Note: the ordering can be different):

{"id":201,"product":"foo","quantity":100,"price":50.0}
{"id":202,"product":"foo","quantity":100,"price":50.0}
{"id":203,"product":"foo","quantity":100,"price":50.0}

4.3 Check S3 bucket for messages

Individual message before and after migration

Before:

After:

Did this answer your question?