The sample scripts in this article demonstrate how to connect to your Aiven for Apache Kafka service and pass a few messages.

Requirements:

  • Gemfile for the required Kafka client library:

    ruby '2.2.4'

    source 'https://rubygems.org' do
    gem 'ruby-kafka'
    end

To test your Aiven for Apache Kafka service:

  1. Download the SSL certificate files in the Aiven web console.

    1. Go to the Overview page of your Aiven for Apache Kafka service.

    2. Click Download next to Access Key and save the service.key file.

    3. Click Download next to Access Certificate and save the service.cert file.

    4. Click Download next to CA Certificate and save the ca.pem file.

  2. Go to the Topics page for your service and add a new topic named demo-topic.

  3. Create and run the producer script:

    require "kafka"

    # Obtained from "Connection parameters" in the Aiven Console
    host = "copy-paste-service-uri"
    # Obtained from "Connection parameters" in the Aiven Console
    port = "12897"
    # Downloaded from the Console via "Show CA certificate"
    ssl_ca_cert = File.read("ca.pem")
    # Downloaded from the Console via "Show access key"
    ssl_client_cert_key = File.read("service.key")
    # Downloaded from the Console via "Show access certificate"
    ssl_client_cert = File.read("service.cert")
    # You must create this topic in the Aiven Console before running
    topic = "demo-topic"
    # All messages with the same partition_key will be written to the same Kafka partition
    partition_key = "test_key"
    # You can enter any string for this
    client_id = "test_producer"
    # or "synchronous". The "asynchronous" mode provides lower publish latency.
    delivery_mode = "asynchronous"

    kafka = Kafka.new(
    seed_brokers: ["#{host}:#{port}"],
    ssl_ca_cert: ssl_ca_cert,
    ssl_client_cert: ssl_client_cert,
    ssl_client_cert_key: ssl_client_cert_key,
    client_id: client_id
    )

    topics = kafka.topics
    unless topics.include?(topic)
    puts "Error: Topic #{topic} does not exist"
    exit
    end

    if delivery_mode == "asynchronous"
    producer = kafka.async_producer(
    # The produce method will raise BufferOverflow if more than 1000 undelivered messages have been buffered
    max_queue_size: 1000,
    max_buffer_size: 100000,
    # Trigger a delivery once 2 messages have been buffered
    delivery_threshold: 2,
    # Trigger a delivery every 10 seconds
    delivery_interval: 10,
    # or :snappy or nil (for no compression)
    compression_codec: :gzip
    )
    end

    3.times.each do |i|
    message = "Message #{i} (#{delivery_mode})"
    ts = Time.now.to_f
    if delivery_mode == "asynchronous"
    producer.produce(message, topic: topic, partition_key: partition_key)
    else
    kafka.deliver_message(message, topic: topic, partition_key: partition_key)
    end

    puts "Message #{i} published #{delivery_mode}ly in #{((Time.now.to_f - ts)/1000.0).round(9)} msec"
    end


    if delivery_mode == "asynchronous"
    # 2 messages should be delivered to Kafka immediately, then 1 more after :delivery_interval seconds
    sleep(15)
    # Deliver any buffered messages to Kafka, irrespective of :delivery_threshold and :delivery_interval
    producer.deliver_messages
    # Call this before exiting to avoid leaking resources. This will block until all pending messages are delivered to Kafka.
    producer.shutdown
    end

  4. Create the consumer script:

    require "kafka"

    # Obtained from "Connection parameters" in the Aiven Console
    host = "copy-paste-service-uri"
    # Obtained from "Connection parameters" in the Aiven Console
    port = "12897"
    # Downloaded from the Console via "Show CA certificate"
    ssl_ca_cert = File.read("ca.pem")
    # Downloaded from the Console via "Show access key"
    ssl_client_cert_key = File.read("service.key")
    # Downloaded from the Console via "Show access certificate"
    ssl_client_cert = File.read("service.cert")
    # You must create this topic in the Aiven Console before running
    topic = "demo-topic"
    # You can enter any string for this
    client_id = "test_consumer"
    # Use nil for the simplest mode of operation
    consumer_group_id = "test-consumer"

    kafka = Kafka.new(
    seed_brokers: ["#{host}:#{port}"],
    ssl_ca_cert: ssl_ca_cert,
    ssl_client_cert: ssl_client_cert,
    ssl_client_cert_key: ssl_client_cert_key,
    client_id: client_id
    )

    topics = kafka.topics
    unless topics.include?(topic)
    puts "Error: Topic #{topic} does not exist"
    exit
    end

    if consumer_group_id
    consumer = kafka.consumer(
    group_id: consumer_group_id,
    # Commit consumer offset at least every 5 seconds
    offset_commit_interval: 5,
    # Commit consumer offset after every 10 messages consumed
    offset_commit_threshold: 10,
    )
    consumer.subscribe(topic, start_from_beginning: false)

    # Stop the consumer when the SIGTERM or SIGINT signal is sent to the process.
    # It's better to shut down gracefully than to kill the process.
    trap("SIGTERM") do
    puts "Stopping consumer due to SIGTERM"
    consumer.stop
    exit
    end

    trap("SIGINT") do
    puts "Stopping consumer due to SIGINT"
    consumer.stop
    exit
    end

    puts "Retrieving messages from the most recent offset using the consumer group API..."
    consumer.each_message do |message|
    puts "#{message.offset}: '#{message.value}'"
    end
    else
    # This loop runs forever, printing out info about the messages received
    puts "Retrieving all non-expired messages from the beginning using the basic consumer API..."
    kafka.each_message(topic: topic) do |message|
    puts "#{message.offset}: '#{message.value}'"
    end
    end

Did this answer your question?