The GCS sink connector enables you to move data from Aiven Kafka cluster to Google Cloud Storage for long term storage.

How It Works

File name format

The connector uses the following format for output files (blobs): <prefix><topic>-<partition>-<start-offset>[.gz] , where:

  • <prefix> – (optional) prefix that can be used, for example, for subdirectories in the bucket;
  • <topic>  – Kafka topic name;
  • <partition> – topic's partition number;
  • <start-offset>  – Kafka offset of the first record in the file;
  • [.gz] suffix is added when compression is enabled.

Data format

Output files are text files that contain one record per line (i.e.,they're separated by \n).

The connector can output the following fields from records into the output: the key, the value, the timestamp, and the offset. (The set and the order of these output fields is configurable.) The field values are separated by comma.

The key and the value—if they're output—are stored as binaries encoded in Base64.

For example, if we output key,value,offset,timestamp, a record line might look like:

a2V5,TG9yZW0gaXBzdW0gZG9sb3Igc2l0IGFtZXQ=,1232155,1554210895

If the key, the value or the timestamp is null, an empty string will be output instead:

,,,1554210895

Configuration

Here you can read about the Connect workers configuration and here, about the connector Configuration.

Here is an example connector configuration with descriptions:

### Standard connector configuration

## Fill in your values in these:

# Unique name for the connector.
# Attempting to register again with the same name will fail.
name=my-gcs-connector

## These must have exactly these values:

# The Java class for the connector
connector.class=io.aiven.kafka.connect.gcs.GcsSinkConnector

# The key converter for this connector
# (must be set to ByteArrayConverter)
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

# The value converter for this connector
# (must be set to ByteArrayConverter)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

# A comma-separated list of topics to use as input for this connector
# Also a regular expression version `topics.regex` is supported.
# See https://kafka.apache.org/documentation/#connect_configuring
topics=topic1,topic2


### Connector-specific configuration
### Fill in you values

# The name of the GCS bucket to use
# Required.
gcs.bucket.name=my-gcs-bucket

# GCP credentials as a JSON object.
# Required.
# Note that the connector supports passing GCP credentials
# as a file, but this is not supported on Aiven platform.
gcs.credentials.json={"type":"...", ...}

# The set of the fields that are to be output, comma separated.
# Supported values are: `key`, `value`, `offset`, and `timestamp`.
# Optional, the default is `value`.
format.output.fields=key,value,offset,timestamp

# The prefix to be added to the name of each file put on GCS.
# See the GCS naming requirements https://cloud.google.com/storage/docs/naming
# Optional, the default is empty.
file.name.prefix=some-prefix/

# The compression type used for files put on GCS.
# The supported values are: `gzip`, `none`.
# Optional, the default is `none`.
file.compression.type=gzip

Example

The following information will be required in the configuration process:

  • Kafka connect URL from the Kafka service

GCS connector definition example: 

curl -X POST \
  https://avnadmin:password@kafka-20-biz4-a-exercise1.aivencloud.com:17293/connectors \
-H "Content-Type: application/json" -d @- \
 << EOF
{
"name": "example-gcs-sink",
  "config": {
    "name": "my-gcs-connector",
    "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "topics": "topic1,topic2",
    "gcs.credentials.json": "{\"type\":\"...\", ...}",
    "gcs.bucket.name": "my-gcs-bucket",
    "file.name.prefix": "some-prefix/",
    "file.compression.type": "gzip",
    "format.output.fields": "key,value,offset"
  }
}
EOF

(Please substitute appropriate values for Kafka connect URL)

Note that gcs.credentials.json value is as string, so all " symbols inside it must be escaped: \".

Please see https://github.com/aiven/aiven-kafka-connect-gcs/ for additional details.
 

Did this answer your question?