Kafka Consumer - Poll

I am facing some serious problems trying to find a solution for my needs regarding KafkaConsumer (> = 0.9).

Imagine that I have a function that should only read n posts from a Kafka topic.

For example: getMsgs(5) → gets the next 5 kafka messages in the subject.

So, I have a loop that looks like this. Edited with the current correct parameters. In this case, the parameter max .poll.records consumer was set to 1, so the real cycle was repeated only once. Different consumers (some of them looked through a lot of messages) shared an abstract father (this one), so he was encoded that way. The numMss part was special for this consumer.

 for (boolean exit= false;!exit;) { Records = consumer.poll(config.pollTime); for (Record r:records) { processRecord(r); //do my things numMss++; if (numMss==maximum) //maximum=5 exit=true; } } 

Given this, the problem is that the poll () method can receive more than 5 messages. For example, if he receives 10 messages, my code will forever forget these 5 messages, since Kafka will think that they are already used.

I tried fixing the offset but it doesn't seem to work:

  consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1))); 

Even with the offset configuration, every time I start the consumer again, it will not start from the 6th message (remember, I just wanted 5 messages), but from the 11th (since the first poll consumed 10 messages).

Is there any solution for this, or maybe (for sure) I missed something?

Thanks in advance!!

+11
source share
4 answers

You can set max.poll.records to any number you like, so in most cases you will get as many records in each survey.

In your use case, which you indicated in this problem, you do not need to explicitly perform the offsets yourself. you can just set enable.auto.commit to true and set auto.offset.reset to earliest so that it auto.offset.reset on when there is no group.id consumer (other words when you start reading from a section for the very first time). After you have group.id and some consumer offsets stored in Kafka, and in the event that your Kafka consumer process dies, it will continue from the last committed offset, because this is the default behavior, because when the consumer starts, it first searches if there are any and if so, it will continue from the last fixed offset, and auto.offset.reset will not .

+12
source

set the auto.offset.reset property to "last". Then try to use, you will get consumed records from a fixed offset.

Or you use user.seek (TopicPartition, offset) api before polling.

0
source

If you turned off auto-commit by setting enable.auto.commit to false. You need to disable this if you want to manually lock the offset. Without this next call to poll (), the last offset of the messages you received from the previous poll () will be automatically recorded.

0
source

From Kafka 0.9 changed the parameter names auto.offset.reset;

What to do if Kafka does not have an initial offset or if the current offset no longer exists on the server (for example, since this data has been deleted):

 earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer group anything else: throw exception to the consumer. 
0
source

All Articles