Apache Kafka: consumer state

I read the documentation on the Kafka website, but trying to implement a complete minimal example (producer โ†’ kafka โ†’ consumer), I donโ€™t really understand how the โ€œconsumer stateโ€ needs to handle the offset,

Some information

  • I am using HighLevel API (Java)
  • My consumer is a simple class with Main, basically the same as on the Kafka quickstart page.
  • I am using zookeeper
  • I use one broker

Now the documentation says that the HighLevel API user saves his state with zookeeper, so I expect an offset, and therefore the consumer state will be maintained between

  • Kafka broker reboots
  • Reboot user

But, unfortunately, this is not so: every time I restart a broker or a consumer, all messages are resent. Now, these are probably stupid questions, but

  • In case of restarting Kafka : I realized that the buyer should maintain his state so that when the broker (re) starts all messages (!) And the consumer decides what to consume ... is this right? If so, what happens if I have 10.0000.0000 messages?

  • If the JVM client restarts : if the state is saved in Zookeeper, why are the messages resent? Is it possible that the new JVM has a different consumer "personality"? And in this case, how can I link the previous identifier?

+7
source share
3 answers

I seem to have been a bad reader ... all this on the configuration page. In particular, both of my questions were resolved by setting the flag "autooffset.reset", which by default is "smallest" and, therefore, causes the described effects.

Now that the value is โ€œlargestโ€ as the value, everything works as expected, both in the case of a reboot of the consumer and the broker, because the bias is always the largest.

+2
source

Yes, the consumer is responsible for maintaining his state, and the high-level Java client saves his state in zookeeper.

Most likely you did not specify the groupId configuration groupId . In this situation, kafka generates a random groupId .

It is also possible that you have disabled the autocommit.enable configuration autocommit.enable .

A complete link to Kafka configuration can be found on this page: http://kafka.apache.org/configuration.html in the section "Important configuration properties for a high-level consumer" .

+4
source

to answer the original question: using groupId helps to avoid the situation of "re-consuming all messages from the beginning of time"

if you change groupId, you will receive all messages from the moment the queue was created (or from the moment of the last data cleaning based on the kafka log storage policy)

do not confuse this with the kafka-console-consumer "--from-begin" flag (which sets the auto.offset.reset parameter), which can be chosen between options 1 and 2 below:

1) consume new messages from the moment the last message was destroyed (NOT from the beginning of the time when the Kafka queue was created):

props.put ("auto.offset.reset", "small");

2) consume new messages from the moment the subscriber starts the JVM (in this case, you risk losing messages placed in the queue when the subscriber is not working and does not listen to the queue):

props.put ("auto.offset.reset", "largest");


side of note: below is only related to the original question

for a more advanced use case - if you are trying to programmatically set a consumer offset for playing messages starting at a specific time, you will need to use the SimpleConsumer API, as shown in https://cwiki.apache.org/confluence/display/KAFKA/0.8. 0 + SimpleConsumer + Example to find the smallest offset to play from the right broker / section. Which essentially replaces zookeeper with our own FindLeader logic. very difficult.

for this use case (ad-hoc repetition of messages starting from a specific user time), we decided to keep the local message cache and manage local offsets instead of using the kafka offset management api (which would require re-implementing a good piece of zookeeper functionality with SimpleConsumer).

those. treat kafka as a "postman", as soon as the message is delivered, it is sent to the local mailbox, and in case we need to return to a certain offset in the past and, say, play messages (which have already been used), for example. in the event of a consumer application error, we do not return to the "post office" (Kafka brokers) to find out the correct delivery of the order, but manage it locally .

end note side

+4
source

All Articles