Skip to main content

Deal with corrupted messages in Apache Kafka

Listen:

Under some strange circumstances, it can happen that a message in a Kafka topic is corrupted. This often happens when using 3rd party frameworks with Kafka. In addition, Kafka < 0.9 does not have a lock on Log.read() at the consumer read level, but does have a lock on Log.write(). This can lead to a rare race condition as described in KAKFA-2477 [1]. A likely log entry looks like this:

ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy

Kafka-Tools

Kafka stores the offset of each consumer in Zookeeper. To read the offsets, Kafka provides handy tools [2]. But you can also use zkCli.sh, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9):

bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test

Prior to Kafka 0.9, the only way to get this information was to use zkCli.sh (or similar tools) to find the consumer group. Since debugging with zkCli is a bit frustrating, I personally use Yahoo's kafka-manager [3]. 

Let's assume the consumers are stored in Zookeeper under /consumer, the command to find the offset looks like:

ls /consumer/test/offsets
[1]
get /consumer/test/offsets/1
[15]

With Kafka that command would look like:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-1 --zookeeper zknode1:2181

Group     Topic   Pid   Offset   logSize   Lag   Owner
console-1 test    1     15       337       326   none

Once the offset is found, it can be incremented to force the consumer to read the next available message. Before doing this, Kafka must be shut down. 

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest 16 test

After the restart, Kafka should be able to read the next message, unless that message is also corrupted. And yes, the corrupted message is lost and cannot be recovered, so it's always a good idea to implement a CRC check before any message gets to Kafka.

A code-based approach is also available [4]. This involves creating a subclass of the ConsumerIterator that catches the message exception, replaces it with a dummy message, and moves on to the next message. Of course, the corrupted message is also lost in this case.

Comments

Popular posts from this blog

Why Is Customer Obsession Disappearing?

 It's wild that even with all the cool tech we've got these days, like AI solving complex equations and doing business across time zones in a flash, so many companies are still struggling with the basics: taking care of their customers.The drama around Coinbase's customer support is a prime example of even tech giants messing up. And it's not just Coinbase — it's a big-picture issue for the whole industry. At some point, the idea of "customer obsession" got replaced with "customer automation," and now we're seeing the problems that came with it. "Cases" What Not to Do Coinbase, as main example, has long been synonymous with making cryptocurrency accessible. Whether you’re a first-time buyer or a seasoned trader, their platform was once the gold standard for user experience. But lately, their customer support practices have been making headlines for all the wrong reasons: Coinbase - Stuck in the Loop:  Users have reported being caugh...

MySQL Scaling in 2024

When your MySQL database reaches its performance limits, vertical scaling through hardware upgrades provides a temporary solution. Long-term growth, though, requires a more comprehensive approach. This involves optimizing the database strategically and integrating complementary technologies. Caching The implementation of a caching layer, such as Memcached or Redis , can result in a notable reduction in the load and an increase ni performance at MySQL. In-memory stores cache data that is accessed frequently, enabling near-instantaneous responses and freeing the database for other tasks. For applications with heavy read traffic on relatively static data (e.g. product catalogues, user profiles), caching represents a low-effort, high-impact solution. Consider a online shop product catalogue with thousands of items. With each visit to the website, the application queries the database in order to retrieve product details. By using caching, the retrieved details can be stored in Memcached (a...

Can AI Really Code?

My upcoming novel,  Catalyst , is set in a world where AI is a major player in shaping the human future. I did some research into how AI is currently being used in software development and found that it has some amazing capabilities, but also some limitations that are a bit concerning. I'd even go so far as to say that those models are a bit of a hoax. They're impressive, but they don't actually solve anything. Yes, AI coding assistants like Devin and Copilot are impressive in demos and demo videos. In reality, they're not as powerful as you'd think, but they're great for simple tasks like crafting email parsing functions or authentication flows. However, I ran into some issues when I tried to use it in more complex situations. When I asked the AI to " write a connector from a database to ingest data into Spark ," it didn't understand and made mistakes. And that is a pure, simple and so well documented task that every non-coder could do that by sim...