Under some strange circumstances, it can happen that a message in a Kafka topic is corrupted. This often happens when using 3rd party frameworks with Kafka. In addition, Kafka < 0.9 does not have a lock on Log.read() at the consumer read level, but does have a lock on Log.write(). This can lead to a rare race condition as described in KAKFA-2477 [1]. A likely log entry looks like this:
Kafka-Tools
Kafka stores the offset of each consumer in Zookeeper. To read the offsets, Kafka provides handy tools [2]. But you can also use zkCli.sh, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9):
bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group testPrior to Kafka 0.9, the only way to get this information was to use zkCli.sh (or similar tools) to find the consumer group. Since debugging with zkCli is a bit frustrating, I personally use Yahoo's kafka-manager [3].
Let's assume the consumers are stored in Zookeeper under /consumer, the command to find the offset looks like:
With Kafka that command would look like:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-1 --zookeeper zknode1:2181console-1 test 1 15 337 326 none
Once the offset is found, it can be incremented to force the consumer to read the next available message. Before doing this, Kafka must be shut down.
After the restart, Kafka should be able to read the next message, unless that message is also corrupted. And yes, the corrupted message is lost and cannot be recovered, so it's always a good idea to implement a CRC check before any message gets to Kafka.
A code-based approach is also available [4]. This involves creating a subclass of the ConsumerIterator that catches the message exception, replaces it with a dummy message, and moves on to the next message. Of course, the corrupted message is also lost in this case.
Comments
Post a Comment