The Kafka Connect S3 sink connector (by Aiven) enables you to move data from Aiven for Apache Kafka cluster to Amazon S3 for long term storage.

Note: There are two version of S3 sink connector available. One is developed by Aiven, another developed by Confluent. This article use Aiven version. There is another article for S3 sink connector by Confluent.

This article will describe

  • S3 object name and data format

  • Configuration on AWS, using AWS console

  • Configuration on Kafka Connect, using Aiven Console

Data Format

S3 object names

The S3 connector stores a series of files in the specified bucket. By default, each object is named using the pattern

[<aws_s3_prefix>]<topic>-<partition>-<startoffset>[.gz]  

aws_s3_prefix can utilize template placeholders {{ utc_date }} and {{ local_date }}. The .gz  extension is used if gzip compression is used, see file.compression.type  in Aiven Console.

The S3 object name and record grouping into files can be customised, see the documentation in Github on the file name format for further details.

The connector creates one file per Kafka Connect offset.flush.interval.ms  setting for partitions that have received new messages during that period. The setting defaults to 60  seconds.

Data file format

By default, data is stored in one record per line in S3. The format is comma separated fields specified by format.output.fields  configuration option. If key and value fields are selected, they are written out in Base64  encoded form. For example, format.output.fields of value,key,timestamp results in rows looking something like this:

bWVzc2FnZV9jb250ZW50,cGFydGl0aW9uX2tleQ==,1511801218777 

This is the csv output format which is the default, other file formats such as JSON and Parquet can also be configured by setting the format.output.type option. For details on the available options for this, see the documentation in Github.

AWS configuration

Create S3 bucket

You can create S3 bucket at AWS console. When creating the bucket, specify bucket name and region and leave everything else default. It is okay to leave "block all public access" to the default (on) because we can grant permission using IAM.

Create IAM user

Create IAM user and generate access key ID and secret access key. We will need them while configuring the connector.

IAM policy

S3 connector needs the following permissions to the specified bucket:

  • s3:GetObject

  • s3:PutObject

  • s3:AbortMultipartUpload

  • s3:ListMultipartUploadParts

  • s3:ListBucketMultipartUploads

This is an example of inline policy to be added to the IAM user.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": [
"arn:aws:s3:::YOUR-S3-BUCKET-NAME/*",
"arn:aws:s3:::YOUR-S3-BUCKET-NAME"
]
}
]
}

In case of Access Denied error see https://aws.amazon.com/premiumsupport/knowledge-center/s3-troubleshoot-403/

Creating connector on Aiven Console

At your Kafka Connect service, go to "Connectors" tab and click "Create New Connector".

In the "Sink" section, select "Aiven - Amazon AWS S3 sink".

Connector Configuration - Common

  • Connector name - name
    Name of the connector. This cannot be modified after creation.

  • Connector class - connector.class
    Automatically selected in Aiven console. It will be io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector in this case

  • Key and value converter class - key.converter and value.converter
    If you don't need any conversion of the key or value data, select org.apache.kafka.connect.converters.ByteArrayConverter. Other options such as the JsonConverter or AvroConverter can be used to decode values from these formats before writing to S3.

  • Topics or topics regex - topics or topics.regex
    Choose one of them. Only the topic(s) specified (e.g. demo_topic,another_topic), or the topic matches the regex will be processed by the sink connector

Connector Configuration - AWS

  • aws.access.key.id and aws.secret.access.key
    AWS access key ID and secret access key generated at AWS IAM console.

  • aws.s3.bucket.name
    Name of an existing S3 bucket.

  • aws.s3.region
    Region of the S3 bucket. Default to us-east-1 (US East - N. Virginia). Refer to AWS documentation for full list.

Example - create connector on Aiven Console

Here's the example configuration. On the Aiven Console, the configuration it will be printed on the upper right corner.

{
"name": "YOUR-CONNECTOR-NAME",
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics": "YOUR-TOPIC",
"aws.access.key.id": "AKXXXXXXXXXXXXXXXXXX",
"aws.secret.access.key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"aws.s3.bucket.name": "YOUR-S3-BUCKET-NAME",
"aws.s3.region": "eu-west-1"
}

Creating connector with RESTful API

If this is the first time you're trying to create a connector, we recommend Aiven Console. However, if you'd like to create a connector programmatically, you can send HTTP request to Kafka Connect service URI.

Kafka Connect service URI is available on Aiven Console.

Here's an example using curl to send HTTP POST to create S3 sink connector:

curl -X POST \
https://avnadmin:password@kafka-20-biz4-a-exercise1.aivencloud.com:17293/connectors \
-H "Content-Type: application/json" -d @- \
 << EOF
{
"name": "example-s3-sink",
  "config": {
  "aws.access.key.id": "AKI...",
  "aws.secret.access.key": "SECRET_ACCESS_KEY",
  "aws.s3.bucket.name": "aiven-example",
  "aws.s3.prefix": "example-s3-sink/",
  "aws.s3.region": "us-east-1",
  "connector.class":
  "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
  "key.converter":
  "org.apache.kafka.connect.converters.ByteArrayConverter",
  "file.compression.type": "gzip",
  "format.output.fields": "value,key,timestamp",
  "tasks.max": 1,
  "topics": "source_topic,another_topic",
  "value.converter":
  "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
EOF

(Note: Please substitute appropriate values for Kafka connect URL)

Please see https://github.com/aiven/aiven-kafka-connect-s3 for additional details.

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?