Kafka upgrade to 0.9 with new consumer api

We are updating our kafka implementation to 0.9 and using the new java api to create a consumer. I use the code below for the consumer, and we use the configuration topic for the consumer, since in LINE A and LINE B it is a call to our service that processes the messages we receive. Now the problem is that we get an exception if our message processing takes more than 30 seconds.

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("auto.offset.reset", "earliest"); props.put("heartbeat.interval.ms", "1000"); props.put("receive.buffer.bytes", 10485760); props.put("fetch.message.max.bytes", 5242880); props.put("enable.auto.commit", false); //with partition assigned to consumer KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props); // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0); //consumer.assign(Arrays.asList(partition0)); //assign topic to consumer without partition //LINE A consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp()); List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { try { ConsumerRecords<Object, Object> records = consumer.poll(1000); consumeFromQueue(records);//LINE B consumer.commitSync(); } catch (CommitFailedException e) { e.printStackTrace(); System.out.println("CommitFailedException"); } catch (Exception e) { e.printStackTrace(); System.out.println("Exception in while consuming messages"); } 

The exception is

2016-03-03 10: 47: 35.095 INFO 6448 --- [ask-scheduler-3] oakccinternals.AbstractCoordinator: Mark coordinator 2147483647 dead. 2016-03-03 10: 47: 35.096 ERROR 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator: ILLEGAL_GENERATION error occurred while committing offsets for TEST-GROUP group CommitFailedException org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalancing at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.javacla5252) at org.apache.kafka.clients.consumer.internals.ConsumerCoumer OffsetCommitResponseHandler.handle (ConsumerCoordinator.javarige93) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:665) at org.apache.kafka.clientsoords.surator CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:644) at org.apache.kafka.cli ents.consumer.internals.RequestFuture $ 1.onSuccess (RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess (RequestFuture.java:133) at org.apache.kafka.clients.consumer .internals.RequestFuture.complete (RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.javahaps80) at org.apache.kafka.clientsNetwork (NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll (ConsumerNetworkClient.javahaps20) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java : 213)

The above exception occurs when committing an offset. Any suggestions will help you thank you

+6
source share
2 answers

This is because the new consumer is single-threaded, and the only way he can keep the heartbeat with a group of consumers is to interrogate or fix the bias, after 30 seconds the group coordinator marks your consumer dead and calls for group rebalancing. For this situation, you can either increase request.timeout.ms , or split the work of consumption and processing between the two threads.

+6
source

You can limit the number of messages returned by the poll () function by setting

 max.partition.fetch.bytes 

to a suitable threshold that is larger than your largest post but so small that you will receive fewer posts per survey.

Kafka 0.10.x has the ability to explicitly limit the number of messages returned to the client by setting

 max.poll.records 
0
source

All Articles