When a node in a Kafka or Kafka Connect cluster runs low on memory and cannot allocate more space to store data in the main memory, the Java virtual machine (JVM) executing the Kafka service on the node raises java.lang.OutOfMemoryError exceptions. As a result, the JVM may stop processing Kafka connectors or messages in topics.

The reason for these exceptions is that the Kafka node or Kafka Connect worker is running on a machine that does not have enough free RAM available for the given data processing tasks.

For example, you could have a Kafka Connect Amazon AWS S3 sink connector with a TimeBasedPartitioner that is configured to use data directories with a path.format=YYYY/MM format. Each output S3 object contains the data for a full month of the year. Internally, the connector uses a buffer in memory for each S3 object before it is flushed to S3. If the total amount of data for any single month exceeds the amount of available RAM at the time of data ingestion, the JVM executing the Kafka Connect worker throws a java.lang.OutOfMemoryError exception and cannot manage the workload.

How to avoid OutOfMemoryError issues in a Kafka cluster

There are two strategies for handling heavy workloads:

  • Decrease the maximum amount of data simultaneously kept in memory

  • Increase the available memory for Kafka services or Kafka Connect workers

The best strategy depends on a number of factors. Note that very large data volumes may require a combination of both strategies.

Keep memory peak usage low

The first strategy requires keeping the maximum amount of used memory as low as possible while processing your data. This usually requires some changes to your Kafka connector configuration.

In our example above, you could change the settings for the S3 sink connector and use a different data directory format, such as path.format=YYYY/MM/dd. This would keep the maximum amount of data used in any S3 object to the maximum data size of a single day.

You can also fine-tune connector settings such as rotate.schedule.interval.ms, rotate.interval.ms, and partition.duration.ms, and set them to smaller values for committing S3 objects at a higher frequency. This decreases peak memory usage by forcing the connector to write fewer data items to S3 at shorter intervals, while the JVM would release smaller memory buffers more often.

Upgrade to a larger plan

If you cannot change the connector settings outlined above, another option is to upgrade your Kafka service and use a plan with more memory. You can do this in the Aiven web console or with the Aiven CLI client.

Another scenario that affects the ability to handle large workloads is when you run connectors on the nodes of your Kafka cluster. This forces connectors to share available memory with the Kafka brokers and reduces the total amount of memory available for each service. If this is the case, you could separate the services and create a fresh standalone Kafka Connect Integration for your connector using an appropriate service plan. This has the added advantage of allowing you to scale the Kafka brokers and Kafka Connect service independently.

In both options, the service automatically launches new nodes with more memory and the data from the old nodes is moved to new ones. For more information on the downtime that these changes cause, see this article.

Did this answer your question?