When a node in a Kafka or Kafka Connect cluster runs low on memory and cannot allocate more space to store data in main memory, the JVM executing the Kafka service on the node will start throwing java.lang.OutOfMemoryError exceptions and may stop processing Kafka connectors or messages in topics as a result.

The cause for such exceptions is when a Kafka node or a Kafka Connect worker is running on a machine that does not have enough free RAM available for the given data processing tasks.

Take, as an example, a Kafka Connect Amazon AWS S3 sink connector with a TimeBasedPartitioner whose configuration uses data directories with a format set to path.format=YYYY/MM. Each individual output S3 object will contain the data for a full month of the year. Internally, the connector uses a buffer in memory for each S3 object before it is flushed to S3. If the total amount of data for any single month exceeds the amount of available RAM at the time of data ingestion, the JVM executing the Kafka Connect worker will throw a java.lang.OutOfMemoryError exception and thus will not be capable to manage the workload.

How to avoid running into OutOfMemoryError issues in a Kafka cluster?

You could use two strategies to handle heavy workloads:

  1. Decrease the maximum amount of data simultaneously kept in memory, or
  2. Increase the available memory for Kafka services or Kafka Connect workers.

The strategy which works best for you depends on a number of factors. Note that very large data volumes might even require a combination of both strategies. Next, we will describe each strategy with some examples to help you approach a decision.

Keep memory peak usage low

The first strategy requires keeping the maximum amount of memory as low as possible while processing your data. This usually requires some changes to your Kafka connector configuration.

In our example above, you could change the settings of the S3 sink connector and use a different data directory format such as path.format=YYYY/MM/dd. This would keep the maximum amount of data used in any S3 object to the maximum data size of a single day.

You may also fine-tune connector settings such as rotate.schedule.interval.ms, rotate.interval.ms, and partition.duration.ms, and set them to smaller values for committing S3 objects at a higher frequency. This has the desired effect of decreasing memory peak usage by forcing the connector to write fewer data items to S3 at shorter intervals, while the JVM would release smaller memory buffers more often.

Upgrade to a larger plan

If your use case prevents changing connector settings such as those outlined above, another option is to upgrade your Kafka service and use a plan with more memory. This can be done from the Aiven console or with the Aiven CLI client.

A further scenario that affects the ability to handle large workloads is when you run connectors on the nodes of your Kafka cluster. This forces connectors to share available memory with the Kafka brokers and reduces the total amount of memory available for each service. If this is the case, you could separate the services and create a fresh standalone Kafka Connect Integration for your connector using an appropriate service plan. This has the added advantage of allowing you to scale the Kafka brokers and Kafka Connect service independently.

In both options, new nodes with more memory are launched automatically and data from the old nodes are moved to new ones. If you are concerned about downtime you can refer to this article: How much downtime do maintenance operations cause?

Did this answer your question?