Schema Registry makes it possible to evolve a schema without having to rebuild existing Consumers and Producers. This help article walks through steps required to:

  1. Create version 1 of schema, 
  2. Use Apache Avro to compile the schema, and
  3. Create Consumer and Producer that utilize Aiven Kafka and Schema Registry

The following information will be required for this example:

  • Kafka service URL from the Kafka service
  • Schema Registry URL (URL without the username and password), username and password from the Kafka service

Create version 1 of schema

In this tutorial we explore a simple click record schema, ClickRecordV1.avsc, defined in JSON as follows.

{"type": "record",
  "name": "ClickRecord",
  "fields": [
     {"name": "session_id", "type": "string"},
     {"name": "browser", "type": ["string", "null"]},
     {"name": "campaign", "type": ["string", "null"]},
     {"name": "channel", "type": "string"},
     {"name": "referrer", "type": ["string", "null"], "default" : "None"},
     {"name": "ip", "type": ["string", "null"]}
  ]
}

Use Apache Avro to compile the schema

Download Apache Avro avro-tools-1.8.2.jar and use the following command to compile the schema defined in the previous step to produce java class, ClickRecord.java. 

java -jar avro-tools-1.8.2.jar compile schema ./ClickRecordV1.avsc .

(NOTE: Add a "package kafka;" line to ClickRecord.java and move it to your source folder.)

Create Consumer & Producer that utilize Aiven Kafka & Schema Registry

Create Consumer.java and Producer.java classes as follows. (NOTE: generate the keystore and truststore per the getting started guide).

Producer.java class

package kafka;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * @author
 *
 */

public final class Producer {

  /**
   *
   */
  public Producer() {
  }

  /**
   * @param args
   */
  public static void main(String[] args) {
    final Properties props = new Properties();
    props.put("bootstrap.servers", "your-kafka-service:kafka-port");
    props.put("security.protocol", "SSL");
    props.put("ssl.truststore.location", "/path/to/client.truststore.jks");
    props.put("ssl.truststore.password", "secret");
    props.put("ssl.keystore.type", "PKCS12");
    props.put("ssl.keystore.location", "/path/to/client.keystore.p12");
    props.put("ssl.keystore.password", "secret");
    props.put("ssl.key.password", "secret");
    props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    props.put("schema.registry.url", "https://your-kafka-service:schema-reg-port");
    props.put("basic.auth.credentials.source", "USER_INFO");
    props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
    KafkaProducer producer = new KafkaProducer(props);
    try {
      producer = new KafkaProducer(props);
      ClickRecord cr = new ClickRecord();
      cr.setSessionId("abc1234");
      cr.setChannel("HomePage");
     producer.send(new ProducerRecord("topic1", cr));
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      producer.close();
    }
  }
}

Please substitute appropriate values for Kafka service URL, Schema registry URL, avndmin's password (for schema registry) and key and truststore files' location on your computer.

Consumer.java class

package kafka;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* @author
*
*/
public final class Consumer {
  /**
   *
  */
  public Consumer() {
  }

  /**
   * @param args
   */
  public static void main(String[] args) {
    final Properties props = new Properties();
    props.put("bootstrap.servers", "your-kafka-service:kafka-port");
    props.put("security.protocol", "SSL");
    props.put("ssl.truststore.location", "/path/to/client.truststore.jks");
    props.put("ssl.truststore.password", "secret");
    props.put("ssl.keystore.type", "PKCS12");
    props.put("ssl.keystore.location", "/path/to/client.keystore.p12");
    props.put("ssl.keystore.password", "secret");
    props.put("ssl.key.password", "secret");
    props.put("group.id", "grp1");
    props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("schema.registry.url", "https://your-kafka-service:schema-reg-port");
    props.put("basic.auth.credentials.source", "USER_INFO");
    props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
    final String topic = "topic1";
    KafkaConsumer consumer = new KafkaConsumer(props);

    try {
      consumer.subscribe(Arrays.asList(topic));
      while (true) {
        ConsumerRecords<String, ClickRecord> records = consumer.poll(100);
        for (ConsumerRecord record : records) {
           System.out.println(record);
        }
      }
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }
}

Please substitute appropriate values for Kafka service URL, Schema registry URL, avndmin's password (for schema registry) and key and truststore files' location on your computer.

Compile and run the Consumer and Producer classes on separate terminals. (NOTE: the avnadmin-password can be obtained by clicking on the URL for the schema registry on the Aiven web UI console)

Here's a pom.xml snippet of what's required to build Consumer.java, Producer.java and ClickRecord.java.

...
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <avro.version>1.8.2</avro.version>
  </properties>

  <repositories>
    <repository>
      <id>confluent</id>
      <url>http://packages.confluent.io/maven/</url>
    </repository>
  </repositories>
...

...
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${avro.version}</version>
    </dependency>

    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>5.0.0</version>
    </dependency>
...

Did this answer your question?