Aiven services are managed from Aiven web console. First login to the console with your email address and password and you will be automatically taken to the "Services" view that shows all the services of the currently selected project.

Projects allow organizing groups of services under different topics and each project can for example have different billing settings. An empty project is created for you automatically when you sign-up and the free credits are attached to this project. You can create new projects by clicking the project name in the left side-bar and selecting "Create a new project". The same menu can also be used to switch between projects.

To get started with Kafka, first click the "Create a new service" button.

The dialog that opens allows you to specify the main service properties:

  • Service name: A short name for the service used to distinguish it from other services. A random name is provided, but you can type in a more friendly name.
  • Service type: Select "Kafka".
  • Plan: How many servers and what kind of memory/CPU/disk resources will be allocated to run your service.
  • Cloud: Which cloud and region to run the service on. Note that the pricing of the same service may differ between cloud providers and their regions.

After making the selections, click the "Create" button and you will be taken back to the service list view and the newly created service is shown with an indicator that it is being created.

Click the service name in the list and the "Overview" information page for the service opens. This view shows the connection parameters for your service, its current status and allows making changes to the service.

The "Status" indicator will say "REBUILDING" while the service is being created for you. Once the service is up and running, the light will change to green and it will say "RUNNING". Note that while typically services start in a couple of minutes, the performance between clouds varies and it can take longer under some circumstances.



First we'll need to create a suitable Kafka topic by selecting the "Topics" tab from the service page and clicking the "Add topic" button:

Python producer example

These Python examples use the kafka-python library and demonstrate to connect to the Kafka service and pass a few messages. Note that you should first create a topic named demo-topic from the Aiven web console.

Note also that the SSL certificate files referred to in the scripts need to be downloaded from the Aiven service view by clicking the Show CA certificate, Show access certificate and Show access key buttons.

# This script connects to Kafka and send a few messages

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="copy-paste-service-uri",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="service.cert",
    ssl_keyfile="service.key",
)

for i in range(1, 4):
    message = "message number {}".format(i)
    print("Sending: {}".format(message))
    producer.send("demo-topic", message.encode("utf-8"))

# Wait for all messages to be sent
producer.flush()


Python consumer example

# This script receives messages from a Kafka topic
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "demo-topic",
    bootstrap_servers="copy-paste-service-uri",
    client_id="demo-client-1",
    group_id="demo-group",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="service.cert",
    ssl_keyfile="service.key",
)

for msg in consumer:
    print("Received: {}".format(msg.value))

Node.js producer example

This example uses the node-rdkafka library. Note that you should first create a topic named demo-topic from the Aiven web console.

var Kafka = require('node-rdkafka');

var producer = new Kafka.Producer({
    'metadata.broker.list': 'getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705',
    'security.protocol': 'ssl',
    'ssl.key.location': 'service.key',
    'ssl.certificate.location': 'service.crt',
    'ssl.ca.location': 'ca.crt',
    'dr_cb': true
});

producer.connect();

producer.on('ready', function() {
    try {
        producer.produce(
            'demo-topic',  // topic to send the message to
            null,  // partition, null for librdkafka default partitioner
            new Buffer('Hello world!'),  // value
            null,  // optional key
            Date.now()  // optional timestamp
        );  
        producer.flush(2000);
        console.log('Message sent successfully');
    } catch (err) {
        console.log('Failed to send message', err);
    }  
    producer.disconnect();
});

Node.js consumer example

var Kafka = require('node-rdkafka');

var stream = new Kafka.createReadStream({
    'metadata.broker.list': 'getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705',
    'group.id': 'demo-consumer-group',
    'security.protocol': 'ssl',
    'ssl.key.location': 'service.key',
    'ssl.certificate.location': 'service.crt',
    'ssl.ca.location': 'ca.crt'
}, {}, {'topics': ['demo-topic']});

stream.on('data', function(message) {
  console.log('Got message');
  console.log(message.value.toString());
});

Java keystore management

The Java examples below refer to keystore and truststore for holding the keys and certificates. The stores can be created with the following commands, utilizing access certificate and key and CA certificate from the service details page. Both command ask for a password to protect the store with. The latter examples assume 'secret'.

openssl pkcs12 -export -inkey service.key -in service.crt -out client.keystore.p12 -name service_key
keytool -import -file ca.crt -alias CA -keystore client.truststore.jks

Java producer example

package io.aiven.kafkaExample;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {
    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705");
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "client.truststore.jks");
        props.put("ssl.truststore.password", "secret");
        props.put("ssl.keystore.type", "PKCS12");
        props.put("ssl.keystore.location", "client.keystore.p12");
        props.put("ssl.keystore.password", "secret");
        props.put("ssl.key.password", "secret");
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<String, String>("demo-topic", "test-message"));
        producer.close();
    }
}

Java consumer example

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {
    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705");
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "client.truststore.jks");
        props.put("ssl.truststore.password", "secret");
        props.put("ssl.keystore.type", "PKCS12");
        props.put("ssl.keystore.location", "client.keystore.p12");
        props.put("ssl.keystore.password", "secret");
        props.put("ssl.key.password", "secret");
        props.put("group.id", "demo-group");
        props.put("key.deserializer",
          "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
          "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("demo-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s",
                    record.offset(), record.key(), record.value());
            }
        }
    }
}

Kafka Console Tools

Kafka also includes command line clients for interacting with Kafka clusters. Assuming you've created the keystores as above, the you should define the following properties in consumer and/or producer configuration file:

security.protocol=SSL
ssl.protocol=TLS
ssl.key.password=secret
ssl.keystore.location=/path/to/client.keystore.p12
ssl.keystore.password=secret
ssl.keystore.type=PKCS12
ssl.truststore.location=/path/to/client.truststore.jks
ssl.truststore.password=secret
ssl.truststore.type=JKS

You can now call the producer with the command below. Enter one message per line, and use CTRL-D to exit.

kafka-console-producer.sh --broker-list getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705 --topic target-topic --producer.config producer.properties

The consumer can be called with similar command:

kafka-console-consumer.sh --bootstrap-server getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705 --topic target-topic --consumer.config consumer.properties --from-beginning

Ruby Example

Gemfile for the required Kafka client library:

ruby '2.2.4'

source 'https://rubygems.org' do
  gem 'ruby-kafka'
end

Ruby Producer Example

require "kafka"

# Obtained from "Connection parameters" in the Aiven Console
host = "copy-paste-service-uri"                    
# Obtained from "Connection parameters" in the Aiven Console
port = "12897"                                    
# Downloaded from the Console via "Show CA certificate"
ssl_ca_cert =         File.read("ca.pem")          
# Downloaded from the Console via "Show access key"
ssl_client_cert_key = File.read("service.key")    
# Downloaded from the Console via "Show access certificate"
ssl_client_cert =     File.read("service.cert")    
# You must create this topic in the Aiven Console before running
topic = "test"                                    
# All messages with the same partition_key will be written to the same Kafka partition
partition_key = "test_key"                        
# You can enter any string for this
client_id = "test_producer"                        
# or "synchronous".  The "asynchronous" mode provides lower publish latency.
delivery_mode = "asynchronous"                    

kafka = Kafka.new(
  seed_brokers: ["#{host}:#{port}"],
  ssl_ca_cert: ssl_ca_cert,
  ssl_client_cert: ssl_client_cert,
  ssl_client_cert_key: ssl_client_cert_key,
  client_id: client_id
)

topics = kafka.topics
unless topics.include?(topic)
  puts "Error: Topic #{topic} does not exist"
  exit
end

if delivery_mode == "asynchronous"
  producer = kafka.async_producer(
  # The produce method will raise BufferOverflow if more than 1000 undelivered messages have been buffered
    max_queue_size: 1000,            
    max_buffer_size: 100000,
    # Trigger a delivery once 2 messages have been buffered
    delivery_threshold: 2,          
    # Trigger a delivery every 10 seconds
    delivery_interval: 10,          
    # or :snappy or nil (for no compression)
    compression_codec: :gzip        
  )
end

3.times.each do |i|
  message = "Message #{i} (#{delivery_mode})"
  ts = Time.now.to_f
  if delivery_mode == "asynchronous"
    producer.produce(message, topic: topic, partition_key: partition_key)
  else
    kafka.deliver_message(message, topic: topic, partition_key: partition_key)
  end

  puts "Message #{i} published #{delivery_mode}ly in #{((Time.now.to_f - ts)/1000.0).round(9)} msec"
end


if delivery_mode == "asynchronous"
  # 2 messages should be delivered to Kafka immediately, then 1 more after :delivery_interval seconds
  sleep(15)                        
  # Deliver any buffered messages to Kafka, irrespective of :delivery_threshold and :delivery_interval
  producer.deliver_messages        
  # Call this before exiting to avoid leaking resources. This will block until all pending messages are delivered to Kafka.
  producer.shutdown
end

Ruby Consumer Example

require "kafka"

# Obtained from "Connection parameters" in the Aiven Console
host = "copy-paste-service-uri"                    
# Obtained from "Connection parameters" in the Aiven Console
port = "12897"                                    
# Downloaded from the Console via "Show CA certificate"
ssl_ca_cert =         File.read("ca.pem")          
# Downloaded from the Console via "Show access key"
ssl_client_cert_key = File.read("service.key")    
# Downloaded from the Console via "Show access certificate"
ssl_client_cert =     File.read("service.cert")    
# You must create this topic in the Aiven Console before running
topic = "test"                                    
# You can enter any string for this
client_id = "test_consumer"                        
# Use nil for the simplest mode of operation
consumer_group_id = "test-consumer"                

kafka = Kafka.new(
  seed_brokers: ["#{host}:#{port}"],
  ssl_ca_cert: ssl_ca_cert,
  ssl_client_cert: ssl_client_cert,
  ssl_client_cert_key: ssl_client_cert_key,
  client_id: client_id
)

topics = kafka.topics
unless topics.include?(topic)
  puts "Error: Topic #{topic} does not exist"
  exit
end

if consumer_group_id
  consumer = kafka.consumer(
    group_id: consumer_group_id,
    # Commit consumer offset at least every 5 seconds
    offset_commit_interval: 5,    
    # Commit consumer offset after every 10 messages consumed
    offset_commit_threshold: 10,  
  )
  consumer.subscribe(topic, start_from_beginning: false)

  # Stop the consumer when the SIGTERM or SIGINT signal is sent to the process.
  # It's better to shut down gracefully than to kill the process.
  trap("SIGTERM") do
    puts "Stopping consumer due to SIGTERM"
    consumer.stop
    exit
  end

  trap("SIGINT") do
    puts "Stopping consumer due to SIGINT"
    consumer.stop
    exit
  end

  puts "Retrieving messages from the most recent offset using the consumer group API..."
  consumer.each_message do |message|
    puts "#{message.offset}: '#{message.value}'"
  end
else
  # This loop runs forever, printing out info about the messages received
  puts "Retrieving all non-expired messages from the beginning using the basic consumer API..."
  kafka.each_message(topic: topic) do |message|
    puts "#{message.offset}: '#{message.value}'"
  end
end
Did this answer your question?