When a Kafka customer reads one message at a time

We have a Kafka setup that allows us to process messages in parallel between several servers. But each message must be processed exactly once (and only by one server). It works for us and it works fine.

Now the problem is that Kafka consumers read messages in batches for maximum efficiency. This leads to a problem if / if the processing fails, the server shuts down or something else, because then we lose the data that should have been processed.

Is there a way to get Consumer to read only the message at a time to allow Kafka to store the raw messages? Something like; The consumer pulls one message → process → fix offset, when done, repeat. Is this possible with Kafka? Any thoughts / ideas?

Thanks!

+7
apache-kafka
source share
3 answers

You can try setting max.poll.records to 1.

+4
source share

You mentioned that you have only one processing, but then you worry about data loss. I assume that you are just worried about an extreme case when one of your servers goes down? And you are losing data?

I don’t think there is a way to make one message at a time. Looking through consumer configurations , it seems that it is possible to set the maximum bytes that a consumer can extract from Kafka, rather than the number of messages.

fetch.message.max.bytes 

But if you are worried about a complete data loss, if you never commit an offset that Kafka will not mark, it will be fixed and it will not be lost. Read the Kafka documentation on delivery semantics ,

So effectively, Kafka guarantees at least a single delivery by default and allows the user to implement no more than one delivery by disconnecting, repeats the manufacturer’s attempt and sets off before processing the message packet. Exactly delivery requires collaboration with the target storage system, but Kafka provides an offset that makes it straightforward.

Thus, Kafka is not used by default to achieve accurate precision processing. This requires that you implement offset storage whenever you write the output of your processing to storage.

But it can be simpler and simpler, just the consumer saves his offset in the same place as his output ... As an example, our Hadoop ETL, which fills the data in HDFS, saves the offsets in HDFS with the data that it reads, to ensure that either the data or offsets are updated or not.

I hope this helps.

0
source share

It depends on which client you are going to use. For C ++ and python, you can consume an ONE message each time.

For python, I used https://github.com/mumrah/kafka-python . The following code can consume one message each time:

 message = self.__consumer.get_message(block=False, timeout=self.IterTimeout, get_partition_info=True ) 

__ consumer is a SimpleConsumer object.

See my question and answer here: How to stop Python Kafka Consumer in a program?

For C ++, I use https://github.com/edenhill/librdkafka . The following code can consume one message each time.

 214 while( m_bRunning ) 215 { 216 // Start to read messages from the local queue. 217 RdKafka::Message *msg = m_consumer->consume(m_topic, m_partition, 1000); 218 msg_consume(msg); 219 delete msg; 220 m_consumer->poll(0); 221 } 

m_consumer is a pointer to a C ++ Consumer object (C ++ API).

I hope for this help.

0
source share

All Articles