The Kafka Connect S3 sink connector (by Aiven) enables you to move data from Aiven Kafka cluster to Amazon S3 for long term storage.
Note: There are two version of S3 sink connector available. One is developed by Aiven, another developed by Confluent. This article use Aiven version. There is another article for S3 sink connector by Confluent.
This article will describe
- S3 object name and data format
- Configuration on AWS, using AWS console
- Configuration on Kafka Connect, using Aiven Console
Data Format
S3 object names
S3 connector stores series of files in the specified bucket. Each object is named using pattern
[<aws_s3_prefix>]<topic>-<partition>-<startoffset>[.gz]
Aws_s3_prefix can utilize a tempate placeholders {{ utc_date }}
and {{ local_date }}
. The .gz
extension is used if gzip compression is used, see output_compression
in Aiven Console.
The connector creates one file per Kafka Connect offset.flush.interval.ms
setting for partitions that have received new messages during that period. The setting defaults to 60
seconds.
Data file format
Data is stored in one record per line in S3. The format is comma separated fields specified by output_fields
configuration option. If key and value fields are selected, they are written out in Base64
encoded form. For example, output_fields
of value,key,timestamp
results in rows looking something like this:
bWVzc2FnZV9jb250ZW50,cGFydGl0aW9uX2tleQ==,1511801218777
AWS configuration
Create S3 bucket
You can create S3 bucket at AWS console. When creating the bucket, specify bucket name and region and leave everything else default. It is okay to leave "block all public access" to the default (on) because we can grant permission using IAM.
Create IAM user
Create IAM user and generate access key ID and secret access key. We will need them while configuring the connector.
IAM policy
S3 connector needs the following permissions to the specified bucket:
- s3:GetObject
- s3:PutObject
- s3:AbortMultipartUpload
- s3:ListMultipartUploadParts
- s3:ListBucketMultipartUploads
This is an example of inline policy to be added to the IAM user.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": [
"arn:aws:s3:::YOUR-S3-BUCKET-NAME/*",
"arn:aws:s3:::YOUR-S3-BUCKET-NAME"
]
}
]
}
In case of Access Denied
error see https://aws.amazon.com/premiumsupport/knowledge-center/s3-troubleshoot-403/
Creating connector on Aiven Console
At your Kafka Connect service, go to "Connectors" tab and click "Create New Connector".
In the "Sink" section, select "Aiven - Amazon AWS S3 sink".
Connector Configuration - Common
- Connector name -
name
Name of the connector. This cannot be modified after creation. - Connector class -
connector.class
Automatically selected in Aiven console. It will beio.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector
in this case - Key and value converter class -
key.converter
andvalue.converter
Please selectorg.apache.kafka.connect.converters.ByteArrayConverter
- Topics or topics regex -
topics
ortopics.regex
Choose one of them. Only the topic(s) specified (e.g.demo_topic,another_topic
), or the topic matches the regex will be processed by the sink connector
Connector Configuration - Other
aws_access_key_id
andaws_secret_access_key
AWS access key ID and secret access key generated at AWS IAM console.aws_s3_bucket
Name of an existing S3 bucket.aws_s3_region
Region of the S3 bucket. Default tous-east-1
(US East - N. Virginia). Refer to AWS documentation for full list.
Example - create connector on Aiven Console
Here's the example configuration. On the Aiven Console, the configuration it will be printed on the upper right corner.
{
"name": "YOUR-CONNECTOR-NAME",
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics": "YOUR-TOPIC",
"aws_access_key_id": "AKXXXXXXXXXXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"aws_s3_bucket": "YOUR-S3-BUCKET-NAME",
"aws_s3_region": "eu-west-1"
}
Creating connector with RESTful API
If this is the first time you're trying to create a connector, we recommend Aiven Console. However, if you'd like to create a connector programmatically, you can send HTTP request to Kafka Connect service URI.
Kafka Connect service URI is available on Aiven Console.
Here's an example using curl
to send HTTP POST to create S3 sink connector:
curl -X POST \
https://avnadmin:password@kafka-20-biz4-a-exercise1.aivencloud.com:17293/connectors \
-H "Content-Type: application/json" -d @- \
<< EOF
{
"name": "example-s3-sink",
"config": {
"aws_access_key_id": "AKI...",
"aws_secret_access_key": "SECRET_ACCESS_KEY",
"aws_s3_bucket": "aiven-example",
"aws_s3_prefix": "example-s3-sink/",
"aws_s3_region": "us-east-1",
"connector.class":
"io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"key.converter":
"org.apache.kafka.connect.converters.ByteArrayConverter",
"output_compression": "gzip",
"output_fields": "value,key,timestamp",
"tasks.max": 1,
"topics": "source_topic,another_topic",
"value.converter":
"org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
EOF
(Note: Please substitute appropriate values for Kafka connect URL)
Please see https://github.com/aiven/aiven-kafka-connect-s3 for additional details.
Got here by accident? Learn how Aiven simplifies working with Apache Kafka: