This article walks through the steps required to successfully setup a BigQuery sink connector for Kafka and have it consume data from a Kafka topic and subsequently store it in Google BigQuery. This example also uses Kafka Schema Registry to produce and consume data adhering to Avro schemas.

Before the connector is set up, a number of details regarding both the Kafka service and your account and BigQuery service are required.

Steps to setup BigQuery sink connector with Aiven for Kafka

Setting up Kafka service

1. Create Kafka service (minimum Business-4 plan) in the cloud and region of your choice

2. Enable Kafka connect and Schema Registry sub-services for the Kafka service

3. Collect Kafka, Kafka connect and Schema registry registry details that are required

4. Create topic, bigqtopic, in Kafka

Following Kafka service details are required (from Aiven console):

kafka_service_uri (e.g. mykafka-myproj.aivencloud.com:17295)

schema_registry_service_uri (e.g. “https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298”)

schema_registry_host (e.g. “mykafka-myproj.aivencloud.com”)

sr_port (e.g. “17298”)

sr_password (e.g. “xxxyyy”)

kafka_connect_service_uri (e.g. “https://avnadmin:YYY@kafka-customer-demo-exercise1.aivencloud.com:17293”)

Setting up BigQuery Service

Getting your Google key in JSON format:

1. In the Google cloud console navigate, from the top left, select “IAM & Admin” and “Service Accounts”

2. From the three-dot expansion on the far right corner select “Create key”

Creating BigQuery dataset:

1. In the Google cloud console navigate, from the top left, select BigQuery

2. Then select your Google project from the vertical bar in the console

3. On the left panel, select “Resources” and select your project and create the BigQuery dataset called “kafkaconnectortest

Making sure BigQuery API is enabled:

1. In the Google cloud console navigate, from the top left, select “API & Services” and “Dashboard”

2. Click “Enable APIs and Services” from the horizontal panel. Search for “BigQuery API” and follow steps to enable it

Following Google service details are required (from Google cloud console):

1. Your Google project key in JSON (file/blob)

2. Your Google project where BigQuery service is running (e.g. “ornate-flame-xxx”)

Publish some messages to Kafka topic

Publish messages to the bigqtopic topic using kafka-avro-console-producer (this can be obtained from Confluent).

./kafka-avro-console-producer --broker-list $kafka_service_uri --topic bigqtopic --producer.config /path/console-producer.properties --property schema.registry.url=https://schema_registry_host:sr_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:sr_password --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

Sample avro-producer.properties file contents:

bootstrap.servers=kafka_service_uri

security.protocol=SSL

ssl.truststore.location=/path/client.truststore.jks

ssl.truststore.password=secret

ssl.keystore.type=PKCS12

ssl.keystore.location=/path/client.keystore.p12

ssl.keystore.password=secret

ssl.key.password=secret

key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

The truststore and keystore can be created as follows (also described in the getting started guide for Kafka)

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key

keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

BigQuery sink connector configuration

{

"name": "bigquerytest1",

"config": {

"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",

"tasks.max": "1",

"topics": "bigqtopic",

"project": "ornate-flame-xxx",

"datasets": ".*=kafkaconnectortest",

"schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",

"schemaRegistryClient.basic.auth.credentials.source": "URL",

"schemaRegistryLocation": "https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298",

"key.converter": "io.confluent.connect.avro.AvroConverter",

"key.converter.schema.registry.url": "https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298",

"key.converter.basic.auth.credentials.source": "URL",

"key.converter.schemas.enable": "true",

"value.converter": "io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url": "https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298",

"value.converter.basic.auth.credentials.source": "URL",

"value.converter.schemas.enable": "true",

"batch.size": 1,

"autoCreateTables": "true",

"keySource": "JSON",

"keyfile": "<enter the json blob of your google key here>"

}

}

Save the above config to the file bigquery_connector.json

NOTE:

Between versions 1.2.0 and 1.6.5, there is a non-backward compatible change in the configuration of BigQuery Kafka connector on Aiven.

In version 1.2.0, credentials field is used to specify the Google Cloud credentials in JSON format:

...
"credentials": "{...}",
...

In version 1.6.5 or later, please use keyfile field for this and also set keySource=JSON (as described here):

...
"keyfile": "{...}",
"keySource": "JSON",
...

Setting up the BigQuery sink connector for Kafka

Use the following command from the unix command line to set up the connector:

curl -s -H "Content-Type: application/json" -X POST -d @bigquery_connector.json $kafka_connect_service_uri/connectors

Check to verify the connector is in "Running" state and at least one task, "1/1", is running in the Kafka connect tab for the Kafka service. If not, check for exceptions in both the Kafka connect tab and in the Logs tab for the Kafka service for potential errors.

Check the BigQuery database for the inserted records.

SELECT * FROM `ornate-flame-xxx.kafkaconnectortest.bigqtopic`

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?