This article shows you an example of how to migrate your Apache Kafka data from a self-managed instance or other managed service to Aiven for Apache Kafka.

The scenario for this example:

  • The source Kafka cluster and destination Aiven for Apache Kafka cluster use Avro schemas stored in the Schema registry (this example uses a source Kafka cluster deployed on Aiven)

  • An Amazon S3 sink Kafka connector

  • Producers and consumers use TLS encryption with HTTPS basic authentication for interfacing with the Schema registry and Kafka connector

  • Kafka MirrorMaker (v1.0) is used to asynchronously replicate topic data from source to destination with TLS encryption (MirrorMaker is included in the Apache Kafka distribution available on https://kafka.apache.org/)

  • Confluent platform, which you can download for free from the Confluent hub (at the time of writing, version 5.3.1 was generally available)

  • A topic called topic2 with a topic replication factor of 2, 3 partitions, retention of 7 days, and cleanup policy set to delete

  • Java keystore and truststore are set up (see our article on configuring Java SSL for details)

Migration steps

Note: You can find the properties for kafka_host, kafka_destination, kafka_port, schema_registry_port, schema_registry_password, and KAFKA_CONNECT_URI that are used in this example in the Aiven web console.

  1. Set up the source Kafka cluster.

    1. Use the avro-console-consumer command to subscribe to events from topic2:

      > /path/confluent-5.3.1/bin/kafka-avro-console-consumer --bootstrap-server kafka_host.aivencloud.com:kafka_port --topic topic2  --consumer.config /path/mirrormaker/source/kafka-source/avro-consumer.properties --property schema.registry.url=https://kafka_host.aivencloud.com:schema_registry_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:schema_registry_password --from-beginning

    2. Set the consumer configuration properties:

      > cat source/kafka-source/avro-consumer.properties 
      bootstrap.servers=kafka_host.aivencloud.com:kafka_port
      security.protocol=SSL
      ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
      ssl.truststore.password=secret
      ssl.keystore.type=PKCS12
      ssl.keystore.location=/path/mirrormaker/source/kafka-source/client.keystore.p12
      ssl.keystore.password=secret
      ssl.key.password=secret
      group.id=grp1
      client.id=client1
      key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
      value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
      enable.auto.commit=true

    3. Set up the S3 sink connector:

      > cat source/c_avro_s3_sink.json 
      {
      "name": "c_avro_s3_sink",
      "config":
      {
      "connector.class":"io.confluent.connect.s3.S3SinkConnector",
      "s3.credentials.provider.class":"io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
      "storage.class":"io.confluent.connect.s3.storage.S3Storage",
      "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"https://kafka_host.aivencloud.com:schema_registry_port",
      "value.converter.basic.auth.credentials.source":"USER_INFO",
      "value.converter.basic.auth.user.info":"avnadmin:schema_registry_password",
      "value.converter.schemas.enable": "false",
      "tasks.max":"1",
      "topics":"topic2",
      "topics.dir": "json",
      "s3.region":"your-aws-region-of-choice",
      "s3.bucket.name":"your-S3-bucket-name",
      "flush.size":"1",
      "s3.credentials.provider.access_key_id":"XXX",
      "s3.credentials.provider.secret_access_key":"XXX"
      }
      }

      Substitute the values for AWS credentials, AWS region, and S3 bucket.

    4. Create the S3 connector instance and associated tasks:

      > curl -s -H "Content-Type: application/json" -X POST -d @./source/c_avro_s3_sink.json KAFKA_CONNECT_URI/connectors/

    5. Set up the producer.
      This starts publishing messages to the topic that conforms to the avro schema.

      > /path/confluent-5.3.1/bin/kafka-avro-console-producer --broker-list source_kafka.aivencloud.com:kafka_port --topic topic2  --producer.config /path/mirrormaker/source/kafka-source/avro-producer.properties --property schema.registry.url=https://source_kafka.aivencloud.com:sr_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:sr-pw --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'
      {"id": 1, "product": "foo", "quantity": 100, "price": 50}
      {"id": 2, "product": "foo", "quantity": 100, "price": 50}
      {"id": 3, "product": "foo", "quantity": 100, "price": 50}

    6. Set the producer properties:

      > cat source/kafka-source/avro-producer.properties 
      bootstrap.servers=source_kafka.aivencloud.com:kafka_port
      security.protocol=SSL
      ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
      ssl.truststore.password=secret
      ssl.keystore.type=PKCS12
      ssl.keystore.location=/path/mirrormaker/source/kafka-source/client.keystore.p12
      ssl.keystore.password=secret
      ssl.key.password=secret
      key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
      value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

    7. Check that the consumer output is similar to the following:

      {"id":3,"product":"foo","quantity":100,"price":50.0}
      {"id":1,"product":"foo","quantity":100,"price":50.0}
      {"id":2,"product":"foo","quantity":100,"price":50.0}

      Note: The order of the output items may vary.

  2. Set up and run MirrorMaker:

    1. Set the MirrorMaker consumer properties:

      > cat source/mm_consumer.properties 
      bootstrap.servers=kafka_host.aivencloud.com:kafka_port
      exclude.internal.topics=true
      client.id=mirror_maker_consumer
      group.id=mirror_maker_consumer
      #partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
      auto.offset.reset=earliest
      #
      security.protocol=SSL
      ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
      ssl.truststore.password=secret
      ssl.keystore.type=PKCS12
      ssl.keystore.location=/path/mirrormaker/source/kafka-source/client.keystore.p12
      ssl.keystore.password=secret
      ssl.key.password=secret

    2. Set the MirrorMaker producer properties:

      > cat destination/mm_producer.properties 
      bootstrap.servers=kafka_destination.aivencloud.com:kafka_port
      acks=all
      batch.size=1
      #partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
      client.id=mirror_maker_producer
      max.in.flight.requests.per.connection=1
      retries=1000
      #
      security.protocol=SSL
      ssl.truststore.location=/path/mirrormaker/destination/kafka-dest/client.truststore.jks
      ssl.truststore.password=secret
      ssl.keystore.type=PKCS12
      ssl.keystore.location=/path/mirrormaker/destination/kafka-dest/client.keystore.p12
      ssl.keystore.password=secret
      ssl.key.password=secret

    3. Set the Java VM environment variables for MirrorMaker:

      > export KAFKA_HEAP_OPTS='-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'

      Set these variables according to your needs.

    4. Run MirrorMaker:

      > /path/kafka_2.12-2.3.1/bin/kafka-mirror-maker.sh --consumer.config /path/mirrormaker/source/mm_consumer.properties --producer.config /path/mirrormaker/destination/mm_producer.properties --num.streams 1 --whitelist=".*"

      Set num.streams and the whitelist for topics according to your needs

    5. Add new messages to the topic:

      {"id": 4, "product": "foo", "quantity": 100, "price": 50}
      {"id": 5, "product": "foo", "quantity": 100, "price": 50}
      {"id": 6, "product": "foo", "quantity": 100, "price": 50}

    6. Check that the consumer output is similar to the following:

      {"id":6,"product":"foo","quantity":100,"price":50.0}
      {"id":4,"product":"foo","quantity":100,"price":50.0}
      {"id":5,"product":"foo","quantity":100,"price":50.0}

      Note: The order of the output items may vary.

  3. Check the MirrorMaker status.
    Before switching the consumers. S3 sink connector, and producers to the new cluster, make sure that the MirrorMaker is synchronized with all the data in the topics of the source cluster. If this is not the case, pause the producers to give all consumers time to catch up to the data in the topics.

    Run the following command to check the MirrorMaker lag (replace mirror_maker_group with the appropriate MirrorMaker consumer group):

    > /path/kafka_2.12-2.3.1/bin/kafka-consumer-groups.sh --bootstrap-server kafka_host.aivencloud.com:kafka_port --command-config /path/mirrormaker/source/kafka-source/consumer.properties --group mirror_maker_group --describe 

    Where consumer.properties is the following:

    > cat /path/mirrormaker/source/consumer.properties
    bootstrap.servers=kafka_host.aivencloud.com:kafka_port
    security.protocol=SSL
    ssl.truststore.location=/path/mirrormaker/source/kafka-source/client.truststore.jks
    ssl.truststore.password=secret
    ssl.keystore.type=PKCS12
    ssl.keystore.location=//path/mirrormaker/source/kafka-source/client.keystore.p12
    ssl.keystore.password=secret
    ssl.key.password=secret
    group.id=mm_chk
    client.id=mm_chk_client1
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

  4. Switch the consumer over to the new cluster:

    > /path/confluent-5.3.1/bin/kafka-avro-console-consumer --bootstrap-server kafka_destination.aivencloud.com:kafka_port --topic topic2  --consumer.config /path/mirrormaker/destination/kafka-dest/avro-consumer.properties --property schema.registry.url=https://kafka_destination.aivencloud.com:schema_registry_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:schema_registry_password 

    Where avro-consumer.properties is the following:

    > cat destination/kafka-dest/avro-consumer.properties 
    bootstrap.servers=destination_kafka.aivencloud.com:kafka_port
    security.protocol=SSL
    ssl.truststore.location=/path/mirrormaker/destination/kafka-dest/client.truststore.jks
    ssl.truststore.password=secret
    ssl.keystore.type=PKCS12
    ssl.keystore.location=/pathm/mirrormaker/destination/kafka-dest/client.keystore.p12
    ssl.keystore.password=secret
    ssl.key.password=secret
    group.id=grp1
    client.id=client1
    key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
    value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

  5. Switch the S3 connector over to the new cluster.

    1. Switch the S3 sink connector:

      > cat destination/c_avro_s3_sink.json 
      {
      "name": "c_avro_s3_sink",
      "config":
      {
      "connector.class":"io.confluent.connect.s3.S3SinkConnector",
      "s3.credentials.provider.class":"io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
      "storage.class":"io.confluent.connect.s3.storage.S3Storage",
      "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"https://kafka_destination.aivencloud.com:schema_regsitry_port",
      "value.converter.basic.auth.credentials.source":"USER_INFO",
      "value.converter.basic.auth.user.info":"avnadmin:schema_registry_password",
      "value.converter.schemas.enable": "false",
      "tasks.max":"1",
      "topics":"topic2",
      "topics.dir": "json",
      "s3.region":"your-aws-region-of-choice",
      "s3.bucket.name":"your-S3-bucket-name",
      "flush.size":"1",
      "s3.credentials.provider.access_key_id":"XXX",
      "s3.credentials.provider.secret_access_key":"XXX"
      }
      }

    2. Create the S3 connector instance and associated tasks:

      > curl -s -H "Content-Type: application/json" -X POST -d @./destination/c_avro_s3_sink.json KAFKA_CONNECT_URI/connectors/

  6. Switch the producer over to the new cluster:

    > /path/confluent-5.3.1/bin/kafka-avro-console-producer --broker-list kafka_destination.aivencloud.com:kafka_port --topic topic2  --producer.config /path/mirrormaker/destination/kafka-dest/avro-producer.properties --property schema.registry.url=https://kafka_destination.aivencloud.com:schema_registry_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:schema_registry_password --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'
    {"id": 201, "product": "foo", "quantity": 100, "price": 50}
    {"id": 202, "product": "foo", "quantity": 100, "price": 50}
    {"id": 203, "product": "foo", "quantity": 100, "price": 50}

    Where avro-producer.properties is the following:

    > cat destination/kafka-dest/avro-producer.properties 
    bootstrap.servers=kafka_destination.aivencloud.com:kafka_port
    security.protocol=SSL
    ssl.truststore.location=/path/mirrormaker/destination/kafka-dest/client.truststore.jks
    ssl.truststore.password=secret
    ssl.keystore.type=PKCS12
    ssl.keystore.location=/path/mirrormaker/destination/kafka-dest/client.keystore.p12
    ssl.keystore.password=secret
    ssl.key.password=secret
    key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

  7. Check that the consumer output is similar to the following:

    {"id":201,"product":"foo","quantity":100,"price":50.0}
    {"id":202,"product":"foo","quantity":100,"price":50.0}
    {"id":203,"product":"foo","quantity":100,"price":50.0}

    Note: The order of the output items may vary.

  8. Check the S3 bucket for messages:


    Individual message before migration:


    Individual message after migration:

Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?