Aiven services are managed from Aiven web console. First login to the console with your email address and password and you will be automatically taken to the "Services" view that shows all the services of the currently selected project.
Projects allow organizing groups of services under different topics and each project can for example have different billing settings. An empty project is created for you automatically when you sign-up and the free credits are attached to this project. You can create new projects by clicking the project name in the left side-bar and selecting "Create a new project". The same menu can also be used to switch between projects.
To get started with Kafka, first click the "Create a new service" button.
The dialog that opens allows you to specify the main service properties:
- Service name: A short name for the service used to distinguish it from other services. A random name is provided, but you can type in a more friendly name.
- Service type: Select "Kafka" and the version of your choice.
- Plan: How many servers and what kind of memory/CPU/disk resources will be allocated to run your service.
- Cloud: Which cloud and region to run the service on. Note that the pricing of the same service may differ between cloud providers and their regions.
After making the selections, click the "Create" button and you will be taken back to the service list view and the newly created service is shown with an indicator that it is being created.
Click the service name in the list and the "Overview" information page for the service opens. This view shows the connection parameters for your service, its current status and allows making changes to the service.
The "Status" indicator will say "Rebuilding" while the service is being created for you. Once the service is up and running, the light will change to green and it will say "Running". Note that while typically services start in a couple of minutes, the performance between clouds varies and it can take longer under some circumstances.
First we'll need to create a suitable Kafka topic by selecting the "Topics" tab from the service page and clicking the "Add topic" button:
Python producer example
These Python examples use the kafka-python library and demonstrate to connect to the Kafka service and pass a few messages. Note that you should first create a topic named demo-topic
from the Aiven web console.
Note also that the SSL certificate files referred to in the scripts need to be downloaded from the Aiven service view by clicking the Show CA certificate, Show access certificate and Show access key buttons.
# This script connects to Kafka and send a few messages
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="server-name:port",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="service.cert",
ssl_keyfile="service.key",
)
for i in range(1, 4):
message = "message number {}".format(i)
print("Sending: {}".format(message))
producer.send("demo-topic", message.encode("utf-8"))
# Force sending of all messages
producer.flush()
Python consumer example
# This script receives messages from a Kafka topic
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"demo-topic",
auto_offset_reset="earliest",
bootstrap_servers="server-name:port",
client_id="demo-client-1",
group_id="demo-group",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="service.cert",
ssl_keyfile="service.key",
)
# Call poll twice. First call will just assign partitions for our
# consumer without actually returning anything
for _ in range(2):
raw_msgs = consumer.poll(timeout_ms=1000)
for tp, msgs in raw_msgs.items():
for msg in msgs:
print("Received: {}".format(msg.value))
# Commit offsets so we won't get the same messages again
consumer.commit()
Python producer example (SASL-SSL)
NOTE: after enabling SASL on your service you will see new connection options on the overview tab. SASL connections are served on a different port but the host, CA, and user credentials stay the same.
import kafka
host = "kafka-customer-demo-exercise1.aivencloud.com"
sasl_port = 30288
username = "foo"
password = "xxx"
client = kafka.KafkaProducer(
bootstrap_servers=f"{host}:{sasl_port}",
sasl_mechanism="PLAIN",
sasl_plain_password=password,
sasl_plain_username=username,
security_protocol="SASL_SSL",
ssl_cafile="ca.crt",
)
client.send("topic1", b"test")
client.flush()
Python consumer example (SASL-SSL)
from kafka import KafkaConsumer
host = "kafka-customer-demo-exercise1.aivencloud.com"
sasl_port = 30288
username = "bar"
password = "xxx"
client = KafkaConsumer(
"demo-topic",
auto_offset_reset = "earliest",
bootstrap_servers = '{}:{}'.format(host, sasl_port),
client_id = "demo-client-1",
group_id = 'demo-group',
sasl_mechanism = "PLAIN",
sasl_plain_username = username,
sasl_plain_password = password,
security_protocol = "SASL_SSL",
ssl_cafile = "ca.pem"
)
#consumer code
Connect to Kafka with SASL SCRAM
In the same code where you have the consumer and producer code you have to change to:
sasl_mechanism=“SCRAM”
Node.js producer example
This example uses the node-rdkafka library. Note that you should first create a topic named demo-topic
from the Aiven web console. Also note you need to have node-rdkafka
installed with ssl support, which is not the default.
var Kafka = require('node-rdkafka');
console.log(Kafka.features); // this should print 'ssl', among other things
var producer = new Kafka.Producer({
'metadata.broker.list': 'getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705',
'security.protocol': 'ssl',
'ssl.key.location': 'service.key',
'ssl.certificate.location': 'service.cert',
'ssl.ca.location': 'ca.pem',
'dr_cb': true
});
producer.connect();
producer.on('ready', function() {
try {
producer.produce(
'demo-topic', // topic to send the message to
null, // partition, null for librdkafka default partitioner
new Buffer('Hello world!'), // value
null, // optional key
Date.now() // optional timestamp
);
producer.flush(2000);
console.log('Message sent successfully');
} catch (err) {
console.log('Failed to send message', err);
}
producer.disconnect();
});
Node.js consumer example
var Kafka = require('node-rdkafka');
var stream = new Kafka.createReadStream({
'metadata.broker.list': 'getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705',
'group.id': 'demo-consumer-group',
'security.protocol': 'ssl',
'ssl.key.location': 'service.key',
'ssl.certificate.location': 'service.cert',
'ssl.ca.location': 'ca.pem'
}, {}, {'topics': ['demo-topic']});
stream.on('data', function(message) {
console.log('Got message');
console.log(message.value.toString());
});
Java keystore management
The Java examples below refer to keystore and truststore for holding the keys and certificates. The stores can be created with the following commands, utilizing access certificate and key and CA certificate from the service details page. Both command ask for a password to protect the store with. The latter examples assume 'secret'.
openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks
Java producer example
package io.aiven.kafkaExample;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
props.put("bootstrap.servers", "getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "client.truststore.jks");
props.put("ssl.truststore.password", "secret");
props.put("ssl.keystore.type", "PKCS12");
props.put("ssl.keystore.location", "client.keystore.p12");
props.put("ssl.keystore.password", "secret");
props.put("ssl.key.password", "secret");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("demo-topic", "test-message"));
producer.close();
}
}
Java consumer example
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class Consumer {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
props.put("bootstrap.servers", "getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "client.truststore.jks");
props.put("ssl.truststore.password", "secret");
props.put("ssl.keystore.type", "PKCS12");
props.put("ssl.keystore.location", "client.keystore.p12");
props.put("ssl.keystore.password", "secret");
props.put("ssl.key.password", "secret");
props.put("group.id", "demo-group");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s",
record.offset(), record.key(), record.value());
}
}
}
}
Java producer example (SASL-SSL)
NOTE: after enabling SASL on your service you will see new connection options on the overview tab. SASL connections are served on a different port but the host, CA, and user credentials stay the same.
package io.aiven.kafkaExample;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
String sasl_username = "avnadmin";
String sasl_password = "<avnadmin-pw>";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
props.put("bootstrap.servers", "getting-started-with-kafka.htn-aiven-demo.aivencloud.com:<SASL_PORT>");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", jaasConfig);
props.put("ssl.truststore.type", "jks");
props.put("ssl.truststore.location", "client.truststore.jks");
props.put("ssl.truststore.password", "secret");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("demo-topic", "test-message"));
producer.close();
}
}
Java consumer example (SASL-SSL)
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class Consumer {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
String sasl_username = "avnadmin";
String sasl_password = "<avnadmin-pw>";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
props.put("bootstrap.servers", "getting-started-with-kafka.htn-aiven-demo.aivencloud.com:<SASL_PORT>");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", jaasConfig);
props.put("ssl.truststore.type", "jks");
props.put("ssl.truststore.location", "client.truststore.jks");
props.put("ssl.truststore.password", "secret");
props.put("group.id", "demo-group");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s",
record.offset(), record.key(), record.value());
}
}
}
}
Kafka Console Tools
Kafka also includes command line clients for interacting with Kafka clusters. Assuming you've created the keystores as above, the you should define the following properties in consumer and/or producer configuration file:
security.protocol=SSL
ssl.protocol=TLS
ssl.key.password=secret
ssl.keystore.location=/path/to/client.keystore.p12
ssl.keystore.password=secret
ssl.keystore.type=PKCS12
ssl.truststore.location=/path/to/client.truststore.jks
ssl.truststore.password=secret
ssl.truststore.type=JKS
You can now call the producer with the command below. Enter one message per line, and use CTRL-D to exit.
kafka-console-producer.sh --broker-list getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705 --topic target-topic --producer.config producer.properties
The consumer can be called with similar command:
kafka-console-consumer.sh --bootstrap-server getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705 --topic target-topic --consumer.config consumer.properties --from-beginning
Kafka Avro Console Producer command:
kafka-avro-console-producer --broker-list kafka.htn-aiven-demo.aivencloud.com:17705 --producer.config producer.properties --topic target-topic --property value.schema='{"type":"record","name":"Test","fields":[{"name":"itemId","type":"string"},{"name":"description","type":"string"}]}' --property schema.registry.url=https://kafka.htn-aiven-demo.aivencloud.com:port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:schema-reg-password
Ruby Example
Gemfile for the required Kafka client library:
ruby '2.2.4'
source 'https://rubygems.org' do
gem 'ruby-kafka'
end
Ruby Producer Example
require "kafka"
# Obtained from "Connection parameters" in the Aiven Console
host = "copy-paste-service-uri"
# Obtained from "Connection parameters" in the Aiven Console
port = "12897"
# Downloaded from the Console via "Show CA certificate"
ssl_ca_cert = File.read("ca.pem")
# Downloaded from the Console via "Show access key"
ssl_client_cert_key = File.read("service.key")
# Downloaded from the Console via "Show access certificate"
ssl_client_cert = File.read("service.cert")
# You must create this topic in the Aiven Console before running
topic = "test"
# All messages with the same partition_key will be written to the same Kafka partition
partition_key = "test_key"
# You can enter any string for this
client_id = "test_producer"
# or "synchronous". The "asynchronous" mode provides lower publish latency.
delivery_mode = "asynchronous"
kafka = Kafka.new(
seed_brokers: ["#{host}:#{port}"],
ssl_ca_cert: ssl_ca_cert,
ssl_client_cert: ssl_client_cert,
ssl_client_cert_key: ssl_client_cert_key,
client_id: client_id
)
topics = kafka.topics
unless topics.include?(topic)
puts "Error: Topic #{topic} does not exist"
exit
end
if delivery_mode == "asynchronous"
producer = kafka.async_producer(
# The produce method will raise BufferOverflow if more than 1000 undelivered messages have been buffered
max_queue_size: 1000,
max_buffer_size: 100000,
# Trigger a delivery once 2 messages have been buffered
delivery_threshold: 2,
# Trigger a delivery every 10 seconds
delivery_interval: 10,
# or :snappy or nil (for no compression)
compression_codec: :gzip
)
end
3.times.each do |i|
message = "Message #{i} (#{delivery_mode})"
ts = Time.now.to_f
if delivery_mode == "asynchronous"
producer.produce(message, topic: topic, partition_key: partition_key)
else
kafka.deliver_message(message, topic: topic, partition_key: partition_key)
end
puts "Message #{i} published #{delivery_mode}ly in #{((Time.now.to_f - ts)/1000.0).round(9)} msec"
end
if delivery_mode == "asynchronous"
# 2 messages should be delivered to Kafka immediately, then 1 more after :delivery_interval seconds
sleep(15)
# Deliver any buffered messages to Kafka, irrespective of :delivery_threshold and :delivery_interval
producer.deliver_messages
# Call this before exiting to avoid leaking resources. This will block until all pending messages are delivered to Kafka.
producer.shutdown
end
Ruby Consumer Example
require "kafka"
# Obtained from "Connection parameters" in the Aiven Console
host = "copy-paste-service-uri"
# Obtained from "Connection parameters" in the Aiven Console
port = "12897"
# Downloaded from the Console via "Show CA certificate"
ssl_ca_cert = File.read("ca.pem")
# Downloaded from the Console via "Show access key"
ssl_client_cert_key = File.read("service.key")
# Downloaded from the Console via "Show access certificate"
ssl_client_cert = File.read("service.cert")
# You must create this topic in the Aiven Console before running
topic = "test"
# You can enter any string for this
client_id = "test_consumer"
# Use nil for the simplest mode of operation
consumer_group_id = "test-consumer"
kafka = Kafka.new(
seed_brokers: ["#{host}:#{port}"],
ssl_ca_cert: ssl_ca_cert,
ssl_client_cert: ssl_client_cert,
ssl_client_cert_key: ssl_client_cert_key,
client_id: client_id
)
topics = kafka.topics
unless topics.include?(topic)
puts "Error: Topic #{topic} does not exist"
exit
end
if consumer_group_id
consumer = kafka.consumer(
group_id: consumer_group_id,
# Commit consumer offset at least every 5 seconds
offset_commit_interval: 5,
# Commit consumer offset after every 10 messages consumed
offset_commit_threshold: 10,
)
consumer.subscribe(topic, start_from_beginning: false)
# Stop the consumer when the SIGTERM or SIGINT signal is sent to the process.
# It's better to shut down gracefully than to kill the process.
trap("SIGTERM") do
puts "Stopping consumer due to SIGTERM"
consumer.stop
exit
end
trap("SIGINT") do
puts "Stopping consumer due to SIGINT"
consumer.stop
exit
end
puts "Retrieving messages from the most recent offset using the consumer group API..."
consumer.each_message do |message|
puts "#{message.offset}: '#{message.value}'"
end
else
# This loop runs forever, printing out info about the messages received
puts "Retrieving all non-expired messages from the beginning using the basic consumer API..."
kafka.each_message(topic: topic) do |message|
puts "#{message.offset}: '#{message.value}'"
end
end
Go (Sarama) producer example
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"github.com/Shopify/sarama"
)
func main() {
keypair, err := tls.LoadX509KeyPair("my-kafka-service.cert", "my-kafka-service.key")
if err != nil {
log.Println(err)
return
}
caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
if err != nil {
log.Println(err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Version = sarama.V0_10_2_0
// init producer
brokers := []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
// produce logic would go here
producer.Close();
}
Go (sarama) SASL producer example
NOTE: after enabling SASL on your service you will see new connection options on the overview tab. SASL connections are served on a different port but the host, CA, and user credentials stay the same.
package main
import (
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
"io/ioutil"
)
func main() {
caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
RootCAs: caCertPool,
}
// parse Kafka cluster version
version, err := sarama.ParseKafkaVersion("2.4.0")
if err != nil {
panic(err)
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Version = version
config.Metadata.Full = true
config.ClientID = "aiven-sasl-client-plain"
config.Producer.Return.Successes = true
// Kafka SASL configuration
config.Net.SASL.Enable = true
config.Net.SASL.User = "<MY_USER>"
config.Net.SASL.Password = "<MY_PASSWORD>"
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
// TLS configuration
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
// init producer
brokers := []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
// produce logic would go here
_ = producer.Close()
}
Go (kafka-go) producer example
package main
import (
"crypto/tls"
"crypto/x509"
"github.com/segmentio/kafka-go"
"io/ioutil"
"log"
"time"
)
func main() {
keypair, err := tls.LoadX509KeyPair("my-kafka-service.cert", "my-kafka-service.key")
if err != nil {
log.Fatalf("failt to load Access Key and/or Access Certificate: %s", err)
}
caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
if err != nil {
log.Fatalf("Failed to read CA Certificate file: %s", err)
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
log.Fatalf("Failed to parse CA Certificate file: %s", err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
},
}
// init producer
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"},
Topic: "test1",
Dialer: dialer,
})
// produce logic would go here
_ = producer.Close()
}
Go (kafka-go) SASL producer example
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"log"
"io/ioutil"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main()
keypair, err := tls.LoadX509KeyPair("service.cert", "service.key")
caCert, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Println(err)
return
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
log.Println(err)
return
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
}
sramMech, err := scram.Mechanism(scram.SHA512, "ADMIN-USER", "ADMIN-PWD")
if err != nil {
fmt.Println(err)
return
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig,
SASLMechanism: sramMech,
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"YOUR-KAFKA-SERVICE.aivencloud.com:18800"},
Topic: "YOUR-TOPIC",
Balancer: &kafka.Hash{},
Dialer: dialer,
})
defer w.Close()
msg := kafka.Message{Key: []byte(fmt.Sprintf("Key-%s", "KEY-ID")), Value: []byte("MESSAGE CONTENT")}
err = w.WriteMessages(context.Background(), msg)
if err != nil {
log.Println(err)
}
}
Executable Examples
See the examples repository for executable examples in a number of programming languages.
Using SASL
After enabling SASL on your service you will see new connection options on the overview tab. SASL connections are served on a different port but the host, CA, and user credentials stay the same.
Got here by accident? Learn how Aiven simplifies working with Apache Kafka: