This article shows you an example of how to set up a BigQuery sink connector for Apache Kafka, have it consume data from a Kafka topic, and store that data in Google BigQuery. This example also uses Kafka Schema Registry to produce and consume data that follows Avro schemas.

Before setting up the connector, a number of details for your Kafka service, account, and your BigQuery service.

Requirements

  • An Aiven for Apache Kafka service with a minimum service plan of Business-4. For instructions on setting up a new Kafka service, see this article.

  • Kafka Connect integration enabled in your Aiven for Apache Kafka service. You can set this up on the Connectors tab of your service page in the Aiven web console.

  • Schema Registry API enabled in your Aiven for Apache Kafka service. You can set this up on the Schemas tab of your service page in the Aiven web console.

  • A bigqtopic Kafka topic created in your Aiven for Apache Kafka service.

  • Java keystore and truststore set up for your Aiven for Apache Kafka service. For instructions, see this article.

This example uses the following variables from your Kafka service (these are available on the Overview tab of your service page in the Aiven web console):

  • kafka_service_uri (Overview > Kafka tab)

  • schema_registry_service_uri (Overview > Schema Registry tab)

  • schema_registry_host (Overview > Schema Registry tab)

  • schema_registry_port (Overview > Schema Registry tab)

  • schema_registry_password (Overview > Schema Registry tab)

  • kafka_connect_service_uri (Overview > Kafka Connect tab)

This example uses the following variables from your BigQuery service (these are available in Google Cloud Console):

  • google_project_key (file/blob of your Google project key in JSON format)

  • google_project (your Google project where the BigQuery service is running)

Setting up a BigQuery sink connector with Aiven for Apache Kafka

  1. Get your Google key in JSON format.

    1. In Google Cloud Console, select IAM & Admin > Service Accounts from the top-left menu.

    2. Click the action button on the right and select Create key.

  2. Create the BigQuery dataset.

    1. In Google Cloud Console, select BigQuery from the top-left menu.

    2. Select your Google project.

    3. In the left panel, select Resources and select your project.

    4. Create a BigQuery dataset named kafkaconnectortest.

  3. Make sure that the BigQuery API is enabled.

    1. In Google Cloud Console, select API & Services > Dashboard from the top-left menu.

    2. Click Enable APIs and Services.

    3. Search for BigQuery API and follow the steps given to enable it.

  4. Publish some messages to the bigqtopic topic using the kafka-avro-console-producer command:

    bigqtopic  --producer.config /path/console-producer.properties --property schema.registry.url=https://schema_registry_host:schema_registry_port --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=avnadmin:schema_registry_password --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"},{"name":"quantity","type":"int"},{"name":"price","type":"float"}]}'

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}

    Example of the console-producer.properties file:

    bootstrap.servers=kafka_service_uri
    security.protocol=SSL
    ssl.truststore.location=/path/client.truststore.jks
    ssl.truststore.password=secret
    ssl.keystore.type=PKCS12
    ssl.keystore.location=/path/client.keystore.p12
    ssl.keystore.password=secret
    ssl.key.password=secret
    key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

  5. Create the configuration file for the BigQuery sink connector and save it as bigquery_connector.json:

    {
    "name": "bigquerytest1",
    "config": {
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "tasks.max": "1",
    "topics": "bigqtopic",
    "project": "google_project",
    "datasets": ".*=kafkaconnectortest",
    "schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
    "schemaRegistryClient.basic.auth.credentials.source": "URL",
    "schemaRegistryLocation": "https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298",
    "key.converter.basic.auth.credentials.source": "URL",
    "key.converter.schemas.enable": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "https://avnadmin:XXX@mykafka-myproj.aivencloud.com:17298",
    "value.converter.basic.auth.credentials.source": "URL",
    "value.converter.schemas.enable": "true",
    "batch.size": 1,
    "autoCreateTables": "true",
    "keySource": "JSON",
    "keyfile": "google_project_key"
    }
    }

    Note: The configuration of the BigQuery Kafka connector in Aiven has a non-backward-compatible change between versions 1.2.0 and 1.6.5:

    • Version 1.2.0 uses the credentials field to specify the Google Cloud credentials in JSON format:

      ...
      "credentials": "{...}",
      ...

    • From version 1.6.5 on, use the keyfile field and set the keySource parameter to JSON (as described here):

      ...
      "keyfile": "{...}",
      "keySource": "JSON",
      ...

  6. Run the following command on the Unix command line to set up the BigQuery sink connector for Kafka:

    curl -s -H "Content-Type: application/json" -X POST -d @bigquery_connector.json $kafka_connect_service_uri/connectors

  7. Check that the connector is in the Running state and that it has at least one task (1/1).
    You can see this on the Overview > Kafka Connect tab of your service page in the Aiven web console. If the connector is not running, check the Kafka Connect tab and the Logs tab for any error messages.

  8. Run the following command to check that the records are added to the BigQuery database:

    SELECT * FROM `google_project.kafkaconnectortest.bigqtopic`

Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?