The S3 sink connector enables you to move data from Aiven Kafka cluster to Amazon S3 for long term storage. 

Data Format

S3 Object names

S3 connector stores series of files in the specified bucket. Each object is named using pattern

[<aws_s3_prefix>]<topic>-<partition>-<startoffset>[.gz]  

The .gz  extension is used if gzip compression is used, see output_compression  below. The connector creates one file per Kafka Connect offset.flush.interval.ms  setting for partitions that have received new messages during that period. The setting defaults to 60  seconds.

Data file format

Data is stored in one record per line in S3. The format is comma separated fields specified by output_fields  configuration option. If key and value fields are selected, they are written out in Base64  encoded form. For example, output_fields  of value,key,timestamp results in rows looking something like this:

bWVzc2FnZV9jb250ZW50,cGFydGl0aW9uX2tleQ==,1511801218777 

Usage

Connector Configuration

aws_access_key_id:
  AWS Access Key ID for accessing S3 bucket. Mandatory.
aws_secret_access_key:
  AWS S3 Secret Access Key. Mandatory.
aws_s3_bucket:
  Name of an existing bucket for storing the records. Mandatory.
aws_s3_region:
  Name of the region for the bucket used for storing the records. Defaults to us-east-1.
connector.class:
  Connector class name, in this case: io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnectorkey.converter:
Connector specific key encoding, must be set to
org.apache.kafka.connect.converters.ByteArrayConverter
output_compression:
  Compression type for output files. Supported algorithms are gzip and none. Defaults to gzip.
output_fields:
  A comma separated list of fields to include in output. Supported values are: key, offset, timestamp and value. Defaults to value.
topics:
  Topics to subscribe to. See Kafka Connect documentation for details. E.g. demo_topic,another_topicvalue.converter: Connector specific value encoding, must be set to
org.apache.kafka.connect.converters.ByteArrayConverter

Example

The following information will be required in the configuration process:

  • Kafka connect URL from the Kafka service

S3 connector definition example:

curl -X POST \
https://avnadmin:password@kafka-20-biz4-a-exercise1.aivencloud.com:17293/connectors \
-H "Content-Type: application/json" -d @- \
 << EOF
{
"name": "example-s3-sink",
  "config": {
  "aws_access_key_id": "AKI...",
  "aws_secret_access_key": "SECRET_ACCESS_KEY",
  "aws_s3_bucket": "aiven-example",
  "aws_s3_prefix": "example-s3-sink/",
  "aws_s3_region": "us-east-1",
  "connector.class":
  "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
  "key.converter":
  "org.apache.kafka.connect.converters.ByteArrayConverter",
  "output_compression": "gzip",
  "output_fields": "value,key,timestamp",
  "tasks.max": 1,
  "topics": "source_topic,another_topic",
  "value.converter":
  "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
EOF

(Note: Please substitute appropriate values for Kafka connect URL)

Did this answer your question?