The Kafka Connect S3 sink connector (by Aiven) enables you to move data from Aiven Kafka cluster to Amazon S3 for long term storage.

Note: There are two version of S3 sink connector available. One is developed by Aiven, another developed by Confluent. This article use Aiven version. There is another article for S3 sink connector by Confluent.

This article will describe

  • S3 object name and data format
  • Configuration on AWS, using AWS console
  • Configuration on Kafka Connect, using Aiven Console

Data Format

S3 object names

S3 connector stores series of files in the specified bucket. Each object is named using pattern

[<aws_s3_prefix>]<topic>-<partition>-<startoffset>[.gz]  

Aws_s3_prefix can utilize a tempate placeholders {{ utc_date }} and {{ local_date }}. The .gz  extension is used if gzip compression is used, see output_compression  in Aiven Console.

The connector creates one file per Kafka Connect offset.flush.interval.ms  setting for partitions that have received new messages during that period. The setting defaults to 60  seconds.

Data file format

Data is stored in one record per line in S3. The format is comma separated fields specified by output_fields  configuration option. If key and value fields are selected, they are written out in Base64  encoded form. For example, output_fields  of value,key,timestamp results in rows looking something like this:

bWVzc2FnZV9jb250ZW50,cGFydGl0aW9uX2tleQ==,1511801218777 

AWS configuration

Create S3 bucket

You can create S3 bucket at AWS console. When creating the bucket, specify bucket name and region and leave everything else default. It is okay to leave "block all public access" to the default (on) because we can grant permission using IAM.

Create IAM user

Create IAM user and generate access key ID and secret access key. We will need them while configuring the connector.

IAM policy

S3 connector needs the following permissions to the specified bucket:

  • s3:GetObject
  • s3:PutObject
  • s3:AbortMultipartUpload
  • s3:ListMultipartUploadParts
  • s3:ListBucketMultipartUploads

This is an example of inline policy to be added to the IAM user.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": [
"arn:aws:s3:::YOUR-S3-BUCKET-NAME/*",
"arn:aws:s3:::YOUR-S3-BUCKET-NAME"
]
}
]
}

In case of Access Denied error see https://aws.amazon.com/premiumsupport/knowledge-center/s3-troubleshoot-403/

Creating connector on Aiven Console

At your Kafka Connect service, go to "Connectors" tab and click "Create New Connector".

In the "Sink" section, select "Aiven - Amazon AWS S3 sink".

Connector Configuration - Common

  • Connector name - name
    Name of the connector. This cannot be modified after creation.
  • Connector class - connector.class
    Automatically selected in Aiven console. It will be io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector in this case
  • Key and value converter class - key.converter and value.converter
    Please select org.apache.kafka.connect.converters.ByteArrayConverter
  • Topics or topics regex - topics or topics.regex
    Choose one of them. Only the topic(s) specified (e.g. demo_topic,another_topic), or the topic matches the regex will be processed by the sink connector

Connector Configuration - Other

  • aws_access_key_id and aws_secret_access_key
    AWS access key ID and secret access key generated at AWS IAM console.
  • aws_s3_bucket
    Name of an existing S3 bucket.
  • aws_s3_region
    Region of the S3 bucket. Default to us-east-1 (US East - N. Virginia). Refer to AWS documentation for full list.

Example - create connector on Aiven Console

Here's the example configuration. On the Aiven Console, the configuration it will be printed on the upper right corner.

{
"name": "YOUR-CONNECTOR-NAME",
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics": "YOUR-TOPIC",
"aws_access_key_id": "AKXXXXXXXXXXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"aws_s3_bucket": "YOUR-S3-BUCKET-NAME",
"aws_s3_region": "eu-west-1"
}

Creating connector with RESTful API

If this is the first time you're trying to create a connector, we recommend Aiven Console. However, if you'd like to create a connector programmatically, you can send HTTP request to Kafka Connect service URI.

Kafka Connect service URI is available on Aiven Console.

Here's an example using curl to send HTTP POST to create S3 sink connector:

curl -X POST \
https://avnadmin:password@kafka-20-biz4-a-exercise1.aivencloud.com:17293/connectors \
-H "Content-Type: application/json" -d @- \
 << EOF
{
"name": "example-s3-sink",
  "config": {
  "aws_access_key_id": "AKI...",
  "aws_secret_access_key": "SECRET_ACCESS_KEY",
  "aws_s3_bucket": "aiven-example",
  "aws_s3_prefix": "example-s3-sink/",
  "aws_s3_region": "us-east-1",
  "connector.class":
  "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
  "key.converter":
  "org.apache.kafka.connect.converters.ByteArrayConverter",
  "output_compression": "gzip",
  "output_fields": "value,key,timestamp",
  "tasks.max": 1,
  "topics": "source_topic,another_topic",
  "value.converter":
  "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
EOF

(Note: Please substitute appropriate values for Kafka connect URL)

Please see https://github.com/aiven/aiven-kafka-connect-s3 for additional details.

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?