Dealing with Kafka Consumer Lag

We have a service at my company that essentially serves as a searchable cache for all of our orders. It's a relatively simple concept but the implementation is a bit more complex. The service is built on top of Kafka events and every time there is a change to an order, a new event is produced and sent to the service. The service then consumes the event and updates elasticsearch with updated data.

This system has been working well for us for a while now, but recently we started to notice that the service was falling behind on consuming the kafka events.

The Problem

We produce millions of kafka events per hour. Every time an orer is created, updated, or child resources are updated (think line items, shipments, tracking events etc), we get an event for that change, reload the order from every database (microservices so order data is stored across services), and update elasticsearch with the new data. Yes I know what you're thinking, why are you querying the database instead of using microservices to update elasticsearch? Well, that's a story for another time involving legacy code, technical debt, and some generally lazy decisions.

The DB reads were relatively fast but due to the size of the data we were pulling out and the volume of elasticsearch writes we were doing, the service was falling behind on consuming the kafka events. We were seeing lag of up to 150k events at times. After we'd have spikes up that high we would have to wait overnight when the warehouses shut down for the lag to go away. This was causing the data in elasticsearch to be out of date and causing our customers to reach out about their order data being stale.

Our Solution

There are a lot of ways to increase the throughput of a kafka consumer. You can increase the number of partitions, increase the number of consumers, increase the number of threads per consumer, kafka message size, etc. We could have done this but we use a cloud kafka provider which charges you more money based on the number of partitions and the amount of network used. So if we increased our partitions from 4 to 20, we would have been charged 5x more. Similarly, if we increased the payload size of our messages, we would have been charged more for network usage. Increasing the consumers was also an option but we did not want to spin up more nodes just to handle the load because our service was already running on a beefy machine and not even utilizing all of its resources.

So we decided to take a multi faceted approach to solving the problem. Elixir has some unique threading/processing capabilities with genservers and that let us:

  1. Build a queue of events to process
  2. Handle each event in a separate process with a configurable number of workers
  3. Batch the writes to elasticsearch

Building a Queue

We built a queue of events to process by using a simple GenServer. When an event is consumed from kafka, we stored the message in the database and immediately acked the message in kafka. The queue has telemetry metrics on it pushed into datadog which allow us to alert on the state of the queue itself. If we had a spike in errors or anything like that we could easily identify it. This chagne resulted in our kafka lag going from these huge spikes up to 100k down to exponentially smaller spikes of about 40-50 but there were still bottlenecks in the actual processing of the messages.

One huge optimization we were able to make with this setup was deduping. When a message came in for an order and that order was already in the queue, we would just skip it. This is because we only had to reload the order once and update elasticsearch once. Under our old kafka implementation we couldn't dedupe because we had no idea what was already being processed or in the kafka message queue already.

Handling Each Event in a Separate Process

We have a separate GenServer that acted as the queue processor. All it does it pull events from the database and process them. We can specify the number of child workers per kafka topic so if a specific topic is getting a lot of events (like delivery tracking), we can increase the number of workers to handle the load. This allowed us to process more than the 4 events at a time that we were previously limited to. We could in theory process as many events as we wanted within the limits of the kubernetes box, the DB, and elasticsearch. We settled on 40 workers per topic and that resulted in our message throughput increasing significantly (from 2 per second to 80 per second).

80 requests per second was decent but we really wanted this to be 100+ so we started looking at batching the writes to elasticsearch.

Batching Writes to Elasticsearch

This was the final optimization that we made to the service. Instead of writing every order to elasticsearch in the message processor we made a separate GenServer that the message processor would post out to when it was ready to write. When the batched messages got up to a size of 100 (or 5 seconds had passed since the last) we would write the batch to elastic if there were messages to write. This allowed us to write 100 orders to elasticsearch in a single request instead of 100 requests to elasticsearch. Our throughput went up a bit more to about 120 messages per second and we were happy enough with that result that we called it a day, popped the champagne, and went home (not really, we just went back to work).

Conclusion

This was one of those transparent changes that nobody would really notice unless they were looking at the metrics. Our customers stopped complaining about stale data and we were able to keep up with the kafka events. We were able to do this without increasing the cost of our kafka cluster, without increasing the number of nodes we were running, and without increasing the payload size of our messages. All it took was some elixir GenServers and a lot of trial and error to get it right.