Kafka includes a tool called kafka-consumer-groups.sh that allows one to view and manipulate consumer group state.

Setup

Using kafka-consumer-groups.sh with Aiven Kafka requires that you first create keystore and truststore as per instructions in Getting started with Kafka guide under Java keystore management.

Once that's done, you need to create a properties file that points towards those same files. The instructions for this are under Kafka Console Tools in the same guide. The contents should look something like this:

security.protocol=SSL
ssl.keystore.type=PKCS12
ssl.keystore.location=client.keystore.p12
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=changeit

Using kafka-consumer-groups.sh

With the setup done, you should be able to list the currently active groups with:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --list

To view details for a single group:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --group test-group \
    --describe

To list current members of a group:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --group test-group \
    --describe \
    --members

Resetting consumer group offset

To reset an offset for a group, you can issue:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --group test-group \
    --topic test-topic \
    --reset-offsets \
    --to-earliest \
    --execute

You can reset the offset either to the beginning of the data with --to-earliest, to the end of the topic with --to-lastest. If you know a fixed offset, you can use --to-offset directly.

Relative changes are possible with --shift-by argument. You can perform either forward skips with positive numbers or backwards move with negative value.

You can reset offsets based on timestamp by using --to-datetime <YYYY-MM-DDTHH:mm:SS.sss> selector.

--topic by default covers all partitions. You can use format --topic <topicname>:<partition> to address a specific partition, e.g. --topic test-topic:0.

Please do note that the consumer group must be inactive when offset adjustments are issued.

Did this answer your question?