The sample scripts in this article demonstrate how to connect to your Aiven for Apache Kafka service and pass a few messages with either the Go Sarama or kafka-go
library.
To test your Aiven for Apache Kafka service:
Download the SSL certificate files in the Aiven web console.
Go to the Overview page of your Aiven for Apache Kafka service.
Click Download next to Access Key and save the
service.key
file.Click Download next to Access Certificate and save the
service.cert
file.Click Download next to CA Certificate and save the
ca.pem
file.
If you want to use SASL authentication, turn on kafka_authentication_methods.sasl under Advanced configuration on the Overview page of your service.
Create the producer script.
Using Go Sarama with SSL authentication:
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"github.com/Shopify/sarama"
)
func main() {
keypair, err := tls.LoadX509KeyPair("my-kafka-service.cert", "my-kafka-service.key")
if err != nil {
log.Println(err)
return
}
caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
if err != nil {
log.Println(err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Version = sarama.V0_10_2_0
// init producer
brokers := []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
// produce logic would go here
producer.Close();
}Using Go Sarama with SASL-SSL authentication:
package main
import (
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
"io/ioutil"
)
func main() {
caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
RootCAs: caCertPool,
}
// parse Kafka cluster version
version, err := sarama.ParseKafkaVersion("2.4.0")
if err != nil {
panic(err)
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Version = version
config.Metadata.Full = true
config.ClientID = "aiven-sasl-client-plain"
config.Producer.Return.Successes = true
// Kafka SASL configuration
config.Net.SASL.Enable = true
config.Net.SASL.User = "<MY_USER>"
config.Net.SASL.Password = "<MY_PASSWORD>"
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
// TLS configuration
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
// init producer
brokers := []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
// produce logic would go here
_ = producer.Close()
}Using
kafka-go
with SSL authentication:package main
import (
"crypto/tls"
"crypto/x509"
"github.com/segmentio/kafka-go"
"io/ioutil"
"log"
"time"
)
func main() {
keypair, err := tls.LoadX509KeyPair("my-kafka-service.cert", "my-kafka-service.key")
if err != nil {
log.Fatalf("failt to load Access Key and/or Access Certificate: %s", err)
}
caCert, err := ioutil.ReadFile("my-aiven-project-ca.pem")
if err != nil {
log.Fatalf("Failed to read CA Certificate file: %s", err)
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
log.Fatalf("Failed to parse CA Certificate file: %s", err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
},
}
// init producer
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"my-kafka-service-my-aiven-project.aivencloud.com:12233"},
Topic: "test1",
Dialer: dialer,
})
// produce logic would go here
_ = producer.Close()
}Using
kafka-go
with SASL-SSL authentication:package main
import (
"context"
"crypto/tls"
"crypto/x509"
"log"
"io/ioutil"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main()
keypair, err := tls.LoadX509KeyPair("service.cert", "service.key")
caCert, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Println(err)
return
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
log.Println(err)
return
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
}
sramMech, err := scram.Mechanism(scram.SHA512, "ADMIN-USER", "ADMIN-PWD")
if err != nil {
fmt.Println(err)
return
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig,
SASLMechanism: sramMech,
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"YOUR-KAFKA-SERVICE.aivencloud.com:18800"},
Topic: "YOUR-TOPIC",
Balancer: &kafka.Hash{},
Dialer: dialer,
})
defer w.Close()
msg := kafka.Message{Key: []byte(fmt.Sprintf("Key-%s", "KEY-ID")), Value: []byte("MESSAGE CONTENT")}
err = w.WriteMessages(context.Background(), msg)
if err != nil {
log.Println(err)
}
}