How to stop Python Kafka Consumer in a program?

I'm doing Python Kafka (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer at http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html ) when I run the following a piece of code, it will work all the time, even if all messages are consumed. I hope the consumer stops if he consumes all messages. How to do it? Also, I have no idea how to use the stop () function (which is in the base class kafka.consumer.base.Consumer).

UPDATE

I used a signal handler to call the consumer.stop () method. Some error messages were printed on the screen. But the program is still stuck in the loop. When new messages appeared, the consumer consumed and printed them. I also tried client.close (). But the same result.

I need several ways to terminate the for-loop gracefully.

client = KafkaClient("localhost:9092") consumer = SimpleConsumer(client, "test-group", "test") consumer.seek(0, 2)# (0,2) and (0,0) for message in consumer: print "Offset:", message.offset print "Value:", message.message.value 

Any help is appreciated. Thanks.

+3
python apache-kafka kafka-consumer-api kafka-python
source share
2 answers

Use the iter_timeout parameter to set the timeout. If set to 10, like the next code snippet, it will exit if no new message appears within 10 seconds. The default value is "No", which means that the consumer is blocked here, even if new messages do not arrive.

  self.consumer = SimpleConsumer(self.client, "test-group", "test", iter_timeout=10) 

Update

The above is not a good method. When many messages arrive, it is difficult to set iter_timeout small enough to guarantee a stop. So now I am using the get_message () function, which is trying to use a single message and stop. Nothing is returned when there are no new messages.

+2
source share

We can first check the offset of the last message in the thread. Then stop the cycle when we reach this offset.

  client = "localhost:9092" consumer = KafkaConsumer(client) topic = 'test' tp = TopicPartition(topic,0) #register to the topic consumer.assign([tp]) # obtain the last offset value consumer.seek_to_end(tp) lastOffset = consumer.position(tp) consumer.seek_to_beginning(tp) for message in consumer: print "Offset:", message.offset print "Value:", message.message.value if message.offset == lastOffset - 1: break 
+2
source share

All Articles