MongoDB is a powerful high performance NoSQL database that is scalable and easy to use. MongoDB has a vast number of use-cases and has wide adoption due to its unique qualities. It is becoming common practice to move data flowing into MongoDB to Kafka for analytics and other applications. This is where the Debezium (Kafka) connector for MongoDB comes into the picture. Using the Debezium connector, you can now push data from MongoDB into Kafka in a seamless fashion. This help article walks you through the steps required to setup Debezium for MongoDB.

Prerequisites

Details about the MogoDB Atlas instance

These details can be obtained from the Atlas console.
MONGODB_HOST
MONGODB_USER
MONGODB_USER_PW
CLUSTER_PRIMARY (e.g. cluster0-shard-00-00-xyz.gcp.mongodb.net:27017)

Details about Kafka and Kafka connect

These details can be obtained from the Aiven console.
KAFKA_HOST
KAFKA_PORT
KAFKA_CONNECT_HOST
KAFKA_CONNECT_IP (single or multiple IP addresses depending on the plan)
KAFKA_CONNECT_PORT
KAFKA_CONNECT_USER
KAFKA_CONNECT_USER_PW

Setting up MongoDB, Aiven for Kafka and Debezium connector

Setting up IP whitelist for MogoDB Atlas service

Navigate to the "Network Access" section in Atlas for MongoDB and whitelist the IP addresses of Kafka connect (IPs/hosts- KAFKA_CONNECT_IP) and your laptop IP address.

Setting up MongoDB database and collection

Log into MongoDB database using the shell tool and execute the following commands:

$ mongo "mongodb+srv://MONGODB_HOST/test"  --username MONGODB_USER
# create database inventory and switch to it
> use inventory;
# creating collection
> db.createCollection('phones');

Setting Aiven for Kafka

Make sure 

  • kafka.auto_create_topics_enable is enabled (under the Advanced configuration section)

Setting up Debezium connector

We will use a connector configuration similar to the following (debezium-mongo-source.json):

{
"name": "debezium-mongo-source",
"config": {
  "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
  "mongodb.hosts": "CLUSTER_PRIMARY",
  "mongodb.user": "MONGODB_USER",
  "mongodb.password": "MONGODB_USER_PW",
  "mongodb.ssl.enabled": "true",
  "mongodb.name": "fullfillment",
  "tasks.max":"1"
}
}

Create the connector with the following command:

$ curl -s -H "Content-Type: application/json" -X POST -d @debezium-mongo-source.json https://KAFKA_CONNECT_HOST:KAFKA_CONNECT_PORT/connectors/

Check for the connector instantiated succeeded by checking the Kafka connect section in the Aiven console.

Creating data in MongoDB

Go ahead and create data via the mongo tool.

$ mongo "mongodb+srv://MONGODB_HOST/test"  --username MONGODB_USER
# create documents
db.phones.insert({
item: "phone",
brand: "apple",
model: "XR"
});
db.phones.insert({
item: "phone",
brand: "samsung",
model: "galaxy10"
});

Checking data in the topic

Check to verify the data you inserted made it to the Kafka topic via the Debezium connector by issuing the following command:

$ /path/bin/kafka-console-consumer --bootstrap-server KAFKA_HOST:KAFKA_PORT --topic fullfillment.inventory.phones --consumer.config /path/console-consumer.properties --from-beginning

{"after":"{\"_id\": {\"$oid\": \"5e1cd62afd3341a577688afb\"},\"item\": \"phone\",\"brand\": \"apple\",\"model\": \"XR\"}","patch":null,"source":{"version":"1.0.0.Final","connector":"mongodb","name":"fullfillment","ts_ms":1578948139000,"snapshot":"false","db":"inventory","rs":"Cluster0-shard-0","collection":"phones","ord":1,"h":0,"tord":null},"op":"c","ts_ms":1578948139055}
{"after":"{\"_id\": {\"$oid\": \"5e1cd62bfd3341a577688afc\"},\"item\": \"phone\",\"brand\": \"samsung\",\"model\": \"galaxy10\"}","patch":null,"source":{"version":"1.0.0.Final","connector":"mongodb","name":"fullfillment","ts_ms":1578948139000,"snapshot":"false","db":"inventory","rs":"Cluster0-shard-0","collection":"phones","ord":2,"h":0,"tord":null},"op":"c","ts_ms":1578948139352}

Where consumer.config is something like (Please refer to the getting started guide on how to generate trust and key stores which are used below):

security.protocol=SSL
ssl.truststore.location=/path/client.truststore.jks
ssl.truststore.password=secret
ssl.keystore.type=PKCS12
ssl.keystore.location=/path/client.keystore.p12
ssl.keystore.password=secret
ssl.key.password=secret
group.id=grp1
Did this answer your question?