How does the cancellation expire for the Apache Kafka consumer group?

I did some tests on an old topic when I noticed strange behavior. While reading the Kafka log, I noticed this message “Deleted 8 expired offsets”:

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator) [GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator) Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log) Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log) Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log) Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log) Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log) Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log) Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex) [GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator) [GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator) [GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator) [Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) 

Actually I have 2 questions:

  1. How does expiration of this bias work for a consumer group?

  2. Can this expired bias explain this behavior when my consumer won’t poll anything when he had auto.offset.reset = latest , but he polled from the last recorded bias when he had auto.offset.reset = earliest ?

+26
apache-kafka
Aug 24 '16 at 19:21
source share
2 answers

Kafka by default deletes fixed offsets after a custom period of time. See the offsets.retention.minutes parameter. That is, if the consumer group is inactive (that is, it does not record the bias) during this time, the bias is deleted. Thus, even if the consumer is working, if he does not have offset.retention.minutes offsets for some sections, these offsets are subject to offset.retention.minutes .

If you are starting a consumer, the following happens:

  1. search for (actual) fixed bias (for a group of consumers)
    1. if the correct offset is found, resume from there
    2. if no valid offset is found, reset the offset according to the auto.offset.reset parameter

Thus, if your offsets were deleted and auto.offset.reset = latest , your consumer will not poll anything until new data is added to the topic. If auto.offset.reset = earliest it should auto.offset.reset = earliest whole topic.

See This JIRA for a discussion about this https://issues.apache.org/jira/browse/KAFKA-3806 and https://issues.apache.org/jira/browse/KAFKA-4682

+44
Aug 25 '16 at 9:53 on
source share

Check out my answer here . You should not forget about moving files. This affects the removal of offset files.

0
May 13 '19 at 12:15
source share



All Articles