The Kafka Connect S3 sink connector by Confluent enables you to move data from Aiven Kafka cluster to Amazon S3 for long term storage.

Note: There are two version of S3 sink connector available. One is developed by Confluent, another developed by Aiven. This article use Confluent version. There is another article for S3 sink connector by Aiven.

This article will describe

  • S3 object name and data format
  • Configuration on AWS, using AWS console
  • Configuration on Kafka Connect, using Aiven Console

Data Format

S3 object names

S3 connector stores series of files in the specified bucket. Each object is named using pattern

topics/<TOPIC>/partition=<PARTITION>/<TOPIC>+<PARTITIOIN>+<OFFSET>.bin

For example, a topic with 10 messages in 3 partitions will looks like

topics/YOUR-TOPIC/partition=0/YOUR-TOPIC+0+0000000000.bin
topics/YOUR-TOPIC/partition=0/YOUR-TOPIC+0+0000000001.bin
topics/YOUR-TOPIC/partition=0/YOUR-TOPIC+0+0000000002.bin
topics/YOUR-TOPIC/partition=0/YOUR-TOPIC+0+0000000003.bin
topics/YOUR-TOPIC/partition=1/YOUR-TOPIC+1+0000000000.bin
topics/YOUR-TOPIC/partition=1/YOUR-TOPIC+1+0000000001.bin
topics/YOUR-TOPIC/partition=1/YOUR-TOPIC+1+0000000002.bin
topics/YOUR-TOPIC/partition=2/YOUR-TOPIC+2+0000000000.bin
topics/YOUR-TOPIC/partition=2/YOUR-TOPIC+2+0000000001.bin
topics/YOUR-TOPIC/partition=2/YOUR-TOPIC+2+0000000002.bin

Data file format

In this demo we will set flush.size to 1, so S3 object will be created for every Kafka message. If you increase the flush.size, multiple messages will be put into a S3 object, with newline character at the end of each message.

For example

message1
message2
message3

AWS configuration

Create S3 bucket

You can create S3 bucket at AWS console. When creating the bucket, specify bucket name and region and leave everything else default. It is okay to leave "block all public access" to the default (on) because we can grant permission using IAM.

Create IAM user

Create IAM user and generate access key ID and secret access key. We will need them while configuring the connector.

IAM policy

S3 connector needs the following permissions to the specified bucket:

  • s3:GetObject
  • s3:PutObject
  • s3:AbortMultipartUpload
  • s3:ListMultipartUploadParts
  • s3:ListBucketMultipartUploads

This is an example of inline policy to be added to the IAM user.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": [
"arn:aws:s3:::YOUR-S3-BUCKET-NAME/*",
"arn:aws:s3:::YOUR-S3-BUCKET-NAME"
]
}
]
}

In case of Access Denied error see https://aws.amazon.com/premiumsupport/knowledge-center/s3-troubleshoot-403/

Creating connector on Aiven Console

At your Kafka Connect service, go to "Connectors" tab and click "Create New Connector".

In the "Sink" section, select "Confluent - Amazon AWS S3 sink".

Connector Configuration - Common

  • Connector name - name
    Name of the connector. This cannot be modified after creation.
  • Connector class - connector.class
    Automatically selected in Aiven console. It will be io.confluent.connect.s3.S3SinkConnector in this case
  • Key and value converter class - key.converter and value.converter
    Please select org.apache.kafka.connect.converters.ByteArrayConverter
  • Topics or topics regex - topics or topics.regex
    Choose one of them. Only the topic(s) specified (e.g. demo_topic,another_topic), or the topic matches the regex will be processed by the sink connector

Connector Configuration - Connector

  • Format class - format.class
    Please enter io.confluent.connect.s3.format.bytearray.ByteArrayFormat
  • Flush size - flush.size
    Number of messages before the connector create an S3 object. For this demo, please enter 1

Connector Configuration - S3

  • S3 bucket - s3.bucket.name
    Name of an existing S3 bucket.
  • AWS region - s3.region
    Region of the S3 bucket. Refer to AWS documentation for full list.
  • AWS Credentials Provider Class - s3.credentials.provider.class
    Please enter io.aiven.kafka.connect.util.AivenAWSCredentialsProvider

Connector Configuration - Storage

  • Storage Class - storage.class
    Please enter io.confluent.connect.s3.storage.S3Storage

Connector Configuration - Extra

We need to create two extra configuration for AWS access key ID and secret access key

  • s3.credentials.provider.access_key_id
  • s3.credentials.provider.secret_access_key

Example - create connector on Aiven Console

Here's the example configuration. On the Aiven Console, the configuration it will be printed on the upper right corner.

{
"name": "YOUR-CONNECTOR-NAME",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics": "YOUR-TOPIC",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "1",
"s3.bucket.name": "YOUR-S3-BUCKET-NAME",
"s3.region": "eu-west-1",
"s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.secret_access_key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"s3.credentials.provider.access_key_id": "AKXXXXXXXXXXXXXXXXXX"
}

Further information

You can find additional documentation at https://docs.confluent.io/5.0.0/connect/kafka-connect-s3/index.html.

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?