Kafka Connect Offsets. Get a kit?

How do I get, set, or reset the Kafka Connect connector / task / receiver offset?

I can use the /usr/bin/kafka-consumer-groups kafka.admin.ConsumerGroupCommand that runs kafka.admin.ConsumerGroupCommand to see the offsets for all of my regular Kafka consumer groups. However, Kafka Connect tasks and groups are not displayed using this tool.

In the same way, I can use the zookeeper shell to connect to Zookeeper, and I can see zookeeper entries for regular Kafka consumer groups, but not for Kafka Connect drains.

+10
source share
3 answers

Starting with 0.10.0.0, Connect does not provide an API for managing offsets. This is what we want to improve in the future, but not yet. ConsumerGroupCommand will be the right offset management tool for Sink connectors. Note that the offsets of the original connectors are stored in a special offsets topic for Connect (they are not similar to the usual Kafka offsets, since they are defined by the source system, see offset.storage.topic in docs ), and since Sink Connectors uses a new user, they will not store their offsets in Zookeeper - all modern customers use their own Kafka-based offsets. ConsumerGroupCommand can work with these offsets, you just need to pass the parameter --new-consumer ).

+11
source

You cannot set offsets, but you can use the kafka-consumer-groups.sh tool to β€œscroll” the feed forward.

The consumer group of your connector is named connect-*CONNECTOR NAME* , but you can double-check:

 unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list 

To view the current offset:

 unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe 

To move the offset forward:

 unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null 

I believe that you can also move the offset backward by deleting the consumer group first using the --delete flag.

Remember to pause and resume the connection through the Kafka Connect REST API.

+3
source

In my case (testing read files in the manufacturer and consumption on the console, everything is only in local mode), I just saw this in the manufacturer's output:

 offset.storage.file.filename=/tmp/connect.offsets 

Therefore, I wanted to open it, but it is binary, with some subtle characters.

I deleted it (rename it also works), and then I can write to the same file and get the contents of the file from the user again. You must restart the console producer for it to take effect, because it tries to read the offset file, if it does not exist, create a new one so that the offset is reset.

If you want to reset without deleting, you can use:

 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name> 

You can check all group names:

 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 

and check the details of each group:

 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe 

In a production environment, this offset is controlled by zookeeper, so more steps (and cautions) are required. You can refer to this page:

https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

Steps:

 kafka-topics --list --zookeeper localhost:2181 kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset} 
+1
source

All Articles