The GCS sink connector enables you to move data from Aiven Kafka cluster to Google Cloud Storage for long term storage.

The full documentation is available here.

How It Works

File name format

The connector uses the following format for output files (blobs): <prefix><topic>-<partition>-<start-offset>[.gz] , where:

  • <prefix> – (optional) prefix that can be used, for example, for subdirectories in the bucket;
  • <topic>  – Kafka topic name;
  • <partition> – topic's partition number;
  • <start-offset>  – Kafka offset of the first record in the file;
  • [.gz] suffix is added when compression is enabled.

Data format

Output files are text files that contain one record per line (i.e., they're separated by \n).

There are two types of data format available:
- [Default] Flat structure, where field values are separated by comma (csv)
Configuration: format.output.type=csv. Also, this is the default if the property is not present in the configuration.
- [Recommended] Complex structure, where file is in format of JSON lines. It contains one record per line and each line is a valid JSON object (jsonl)
Configuration: format.output.type=jsonl.

Configuration

Here you can read about the Connect workers configuration.

Here is an example connector configuration with descriptions:

### Standard connector configuration

## Fill in your values in these:

# Unique name for the connector.
# Attempting to register again with the same name will fail.
name=my-gcs-connector

## These must have exactly these values:

# The Java class for the connector
connector.class=io.aiven.kafka.connect.gcs.GcsSinkConnector

# The key converter for this connector
key.converter=org.apache.kafka.connect.storage.StringConverter

# The value converter for this connector
value.converter=org.apache.kafka.connect.json.JsonConverter

# Identify, if value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
value.converter.schemas.enable=false

# The type of data format used to write data to the GCS output files.
# The supported values are: `csv`, `jsonl`.
# Optional, the default is `csv`.
format.output.type=jsonl

# A comma-separated list of topics to use as input for this connector
# Also a regular expression version `topics.regex` is supported.
# See https://kafka.apache.org/documentation/#connect_configuring
topics=topic1,topic2

### Connector-specific configuration
### Fill in you values

# The name of the GCS bucket to use
# Required.
gcs.bucket.name=my-gcs-bucket

## The following two options are used to specify GCP credentials.
## See the overview of GCP authentication:
## - https://cloud.google.com/docs/authentication/
## - https://cloud.google.com/docs/authentication/production
## If they both are not present, the connector will try to detect
## the credentials automatically.
## If only one is present, the connector will use it to get the credentials.
## If both are present, this is an error.

# The path to a GCP credentials file.
# Optional, the default is null.
gcs.credentials.path=/some/path/google_credentials.json

# GCP credentials as a JSON object.
# Optional, the default is null.
gcs.credentials.json={"type":"...", ...}

##


# The set of the fields that are to be output, comma separated.
# Supported values are: `key`, `value`, `offset`, `timestamp`, and `headers`.
# Optional, the default is `value`.
format.output.fields=key,value,offset,timestamp,headers

# The prefix to be added to the name of each file put on GCS.
# See the GCS naming requirements https://cloud.google.com/storage/docs/naming
# Optional, the default is empty.
file.name.prefix=some-prefix/

# The compression type used for files put on GCS.
# The supported values are: `gzip`, `snappy`, `zstd`, `none`.
# Optional, the default is `none`.
file.compression.type=gzip

# The time zone in which timestamps are represented.
# Accepts short and long standard names like: `UTC`, `PST`, `ECT`,
# `Europe/Berlin`, `Europe/Helsinki`, or `America/New_York`.
# For more information please refer to https://docs.oracle.com/javase/tutorial/datetime/iso/timezones.html.
# The default is `UTC`.
file.name.timestamp.timezone=Europe/Berlin

# The source of timestamps.
# Supports only `wallclock` which is the default value.
file.name.timestamp.source=wallclock

# The file name template.
# See "File name format" section.
# Optional, the default is `{{topic}}-{{partition}}-{{start_offset:padding=false}}` or
# `{{topic}}-{{partition}}-{{start_offset:padding=false}}.gz` if the compression is enabled.
file.name.template={{topic}}-{{partition}}-{{start_offset:padding=true}}.gz

Example

The following information will be required in the configuration process:

  • Kafka connect URL from the Kafka service

GCS connector definition example: 

curl -X POST \
  https://avnadmin:password@kafka-20-biz4-a-exercise1.aivencloud.com:17293/connectors \
-H "Content-Type: application/json" -d @- \
 << EOF
{
"name": "example-gcs-sink",
  "config": {
    "name": "my-gcs-connector",
    "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    "tasks.max": "1",
"format.output.type": "jsonl",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
    "topics": "topic1,topic2",
    "gcs.credentials.json": "{\"type\":\"...\", ...}",
    "gcs.bucket.name": "my-gcs-bucket",
    "file.name.prefix": "some-prefix/",
    "file.compression.type": "gzip",
    "format.output.fields": "key,value,offset"
  }
}
EOF

(Please substitute appropriate values for Kafka connect URL)

Note that gcs.credentials.json value is as string, so all " symbols inside it must be escaped: \".

Please see https://github.com/aiven/aiven-kafka-connect-gcs/ for additional details.

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?