The sample scripts in this article demonstrate how to connect to your Aiven for Apache Kafka service and pass a few messages with either the Go Sarama or kafka-go library.

To test your Aiven for Apache Kafka service:

  1. Download the SSL certificate files in the Aiven web console.

    1. Go to the Overview page of your Aiven for Apache Kafka service.

    2. Click Download next to Access Key and save the service.key file.

    3. Click Download next to Access Certificate and save the service.cert file.

    4. Click Download next to CA Certificate and save the ca.pem file.

  2. If you want to use SASL authentication, turn on kafka_authentication_methods.sasl under Advanced configuration on the Overview page of your service.

  3. Create the producer script.

    • Using Go Sarama with SSL authentication:

      package main

      import (
      "crypto/tls"
      "crypto/x509"
      "io/ioutil"
      "log"
      "github.com/Shopify/sarama"
      )

      func main() {
      keypair, err := tls.LoadX509KeyPair("my-kafka-service.cert", "my-kafka-service.key")
      if err != nil {
      log.Println(err)
      return
      }

      caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
      if err != nil {
      log.Println(err)
      return
      }
      caCertPool := x509.NewCertPool()
      caCertPool.AppendCertsFromPEM(caCert)

      tlsConfig := &tls.Config{
      Certificates: []tls.Certificate{keypair},
      RootCAs: caCertPool,
      }

      // init config, enable errors and notifications
      config := sarama.NewConfig()
      config.Producer.Return.Successes = true
      config.Net.TLS.Enable = true
      config.Net.TLS.Config = tlsConfig
      config.Version = sarama.V0_10_2_0

      // init producer
      brokers := []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"}
      producer, err := sarama.NewSyncProducer(brokers, config)
      if err != nil {
      panic(err)
      }

      // produce logic would go here

      producer.Close();
      }

    • Using Go Sarama with SASL-SSL authentication:

      package main

      import (
      "crypto/tls"
      "crypto/x509"
      "github.com/Shopify/sarama"
      "io/ioutil"
      )

      func main() {
      caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
      if err != nil {
      panic(err)
      }
      caCertPool := x509.NewCertPool()
      caCertPool.AppendCertsFromPEM(caCert)

      tlsConfig := &tls.Config{
      RootCAs: caCertPool,
      }

      // parse Kafka cluster version
      version, err := sarama.ParseKafkaVersion("2.4.0")
      if err != nil {
      panic(err)
      }

      // init config, enable errors and notifications
      config := sarama.NewConfig()
      config.Version = version
      config.Metadata.Full = true
      config.ClientID = "aiven-sasl-client-plain"
      config.Producer.Return.Successes = true

      // Kafka SASL configuration
      config.Net.SASL.Enable = true
      config.Net.SASL.User = "<MY_USER>"
      config.Net.SASL.Password = "<MY_PASSWORD>"
      config.Net.SASL.Handshake = true
      config.Net.SASL.Mechanism = sarama.SASLTypePlaintext

      // TLS configuration
      config.Net.TLS.Enable = true
      config.Net.TLS.Config = tlsConfig

      // init producer
      brokers := []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"}
      producer, err := sarama.NewSyncProducer(brokers, config)
      if err != nil {
      panic(err)
      }

      // produce logic would go here

      _ = producer.Close()
      }

    • Using kafka-go with SSL authentication:

      package main

      import (
      "crypto/tls"
      "crypto/x509"
      "github.com/segmentio/kafka-go"
      "io/ioutil"
      "log"
      "time"
      )

      func main() {
      keypair, err := tls.LoadX509KeyPair("my-kafka-service.cert", "my-kafka-service.key")
      if err != nil {
      log.Fatalf("failt to load Access Key and/or Access Certificate: %s", err)
      }

      caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
      if err != nil {
      log.Fatalf("Failed to read CA Certificate file: %s", err)
      }

      caCertPool := x509.NewCertPool()
      ok := caCertPool.AppendCertsFromPEM(caCert)
      if !ok {
      log.Fatalf("Failed to parse CA Certificate file: %s", err)
      }

      dialer := &kafka.Dialer{
      Timeout: 10 * time.Second,
      DualStack: true,
      TLS: &tls.Config{
      Certificates: []tls.Certificate{keypair},
      RootCAs: caCertPool,
      },
      }

      // init producer
      producer := kafka.NewWriter(kafka.WriterConfig{
      Brokers: []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"},
      Topic: "test1",
      Dialer: dialer,
      })

      // produce logic would go here

      _ = producer.Close()
      }

    • Using kafka-go with SASL-SSL authentication:

      package main

      import (
      "context"
      "crypto/tls"
      "crypto/x509"
      "log"
      "io/ioutil"
      "time"
      "github.com/segmentio/kafka-go"
      "github.com/segmentio/kafka-go/sasl/scram"
      )

      func main()
      keypair, err := tls.LoadX509KeyPair("service.cert", "service.key")
      caCert, err := ioutil.ReadFile("ca.pem")
      if err != nil {
      log.Println(err)
      return
      }
      caCertPool := x509.NewCertPool()
      ok := caCertPool.AppendCertsFromPEM(caCert)
      if !ok {
      log.Println(err)
      return
      }
      tlsConfig := &tls.Config{
      Certificates: []tls.Certificate{keypair},
      RootCAs: caCertPool,
      }
      sramMech, err := scram.Mechanism(scram.SHA512, "ADMIN-USER", "ADMIN-PWD")
      if err != nil {
      fmt.Println(err)
      return
      }
      dialer := &kafka.Dialer{
      Timeout: 10 * time.Second,
      DualStack: true,
      TLS: tlsConfig,
      SASLMechanism: sramMech,
      }
      w := kafka.NewWriter(kafka.WriterConfig{
      Brokers: []string{"YOUR-KAFKA-SERVICE.aivencloud.com:18800"},
      Topic: "YOUR-TOPIC",
      Balancer: &kafka.Hash{},
      Dialer: dialer,
      })
      defer w.Close()
      msg := kafka.Message{Key: []byte(fmt.Sprintf("Key-%s", "KEY-ID")), Value: []byte("MESSAGE CONTENT")}
      err = w.WriteMessages(context.Background(), msg)
      if err != nil {
      log.Println(err)
      }
      }

Did this answer your question?