Kafka topics represent a continuous stream of messages that typically get discarded after log reaches a certain period of time or size. However for certain use cases where only the most recent change is needed such as changes to the database table, keeping a log of all changes that occurred over time may not be necessary except for the most recent changes.
For example, if we have a topic that contains user's home address and every time there is an update, it gets sent to the topic using user_id as primary key and home address as the value:
1001 -> "4 Privet Dr"
1002 -> "221B Baker Street"
1001 -> "21 Jump St"
1001 -> "Paper St"
With infinite log retention, all changes to user's address would be maintained in the logs, but at the same time the log would also grow in size without a bound. With a simple log retention mechanism older log records would be deleted after log reaches certain age or size.
Instead, if we're only interested in the latest version of the key's value we can apply log compaction.
As per official Kafka documentation:
Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.
In other words, Kafka would remove any older records for which there is a newer version available in the partition log.
This retention policy can be set per-topic, so a single cluster can have some topics where retention is enforced by size or time and other topics where retention is enforced by compaction.
Log Compaction Basics
Let's consider a partition log of a compacted topic before and after compaction has been applied.
As we can see below there are two records with duplicate keys K1 and K3. After we apply log compaction, we only keep records with the latest offset (newest values) and the older ones get discarded (more on this on sections below)
When a log is compacted it consists of head and tail, where head is the traditional Kafka log and new records get appended to the end of it. Kafka ensures that the records in the tail consist only of unique keys because only the tail section is scanned during compaction process while head section may contain duplicate keys.
Please note that log compaction occurs inside a partition and if two records with the same key land in different partitions, they will not be compacted together.
If we look "under the hood" of the partition we will find that Kafka divides the partitions into segments which are files (name ends with
.log ) stored on a file system for each partition. A segment file is part of the partition. As the log cleaner cleans log partition segments, the segments get swapped into the log partition immediately replacing the older segments.
The first offset of the segment, base offset, corresponds to the file name of the segment. The last segment in the partition is called an active segment and it is the only segment to which new messages are appended to. During the cleaning process, an active segment is excluded and you may see duplicate records. The user-age partition below contains a segment 04.log that has not yet been compacted, hence you will see duplicate records.
When the segment file reaches a certain size of age, Kafka will create a new segment file. This can be controlled by the following settings:
segment.bytes: create a new segment when current segment becomes greater than this size. This setting can be set during topic creation and defaults to 1GB
segment.ms: forces the segment to roll over and create a new one when the segment becomes older than this value.
Configuring Log Cleaner for Compaction
The log cleaner is enabled by default, but the cleanup policy is determined by the
log.cleanup.policy setting, which is a broker setting that applies to all topics, but can also be overridden to enable for particular topics.
To enable log compaction for all topics, you will need to set
log.cleanup.policy = compact which will affects all topics in the cluster that do not have a configuration override in place.
Before the cleaning begins, the cleaner thread will inspect the logs to find those with highest dirty ratio which is calculated as the ratio of the number of bytes in the head vs the total number of bytes in the log (tail + head).
min.cleanable.dirty.ratio: how frequently the log compactor will attempt to clean the log.
Log cleaner can be configured to leave some amount of uncompacted "head" of the log by setting compaction time lag:
log.cleaner.min.compaction.lag.ms: by setting this value greater than 0 will prevent log cleaner from compacting messages with an age newer than a minimum message age. Allows to delay compacting records.
log.cleaner.max.compaction.lag.ms: The maximum amount of time message will remain uncompacted. Please note that this is not a hard deadline as it is subject to availability of log cleaner threads and actual compaction time.
During the cleanup process, log cleaner threads would also remove records that have a null value, also known as tombstone records. These records can be delayed from being deleted by configuring
delete.retention.ms for compacted topic.
From Kafka documentation:
The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).
The consumer sees all tombstones as long as the consumer reaches head of a log in a period less than the topic config
delete.retention.ms (the default is 24 hours).