I am facing some serious problems trying to find a solution for my needs regarding KafkaConsumer (> = 0.9).
Imagine that I have a function that should only read n posts from a Kafka topic.
For example: getMsgs(5) → gets the next 5 kafka messages in the subject.
So, I have a loop that looks like this. Edited with the current correct parameters. In this case, the parameter max .poll.records consumer was set to 1, so the real cycle was repeated only once. Different consumers (some of them looked through a lot of messages) shared an abstract father (this one), so he was encoded that way. The numMss part was special for this consumer.
for (boolean exit= false;!exit;) { Records = consumer.poll(config.pollTime); for (Record r:records) { processRecord(r); //do my things numMss++; if (numMss==maximum) //maximum=5 exit=true; } }
Given this, the problem is that the poll () method can receive more than 5 messages. For example, if he receives 10 messages, my code will forever forget these 5 messages, since Kafka will think that they are already used.
I tried fixing the offset but it doesn't seem to work:
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
Even with the offset configuration, every time I start the consumer again, it will not start from the 6th message (remember, I just wanted 5 messages), but from the 11th (since the first poll consumed 10 messages).
Is there any solution for this, or maybe (for sure) I missed something?
Thanks in advance!!