We are updating our kafka implementation to 0.9 and using the new java api to create a consumer. I use the code below for the consumer, and we use the configuration topic for the consumer, since in LINE A and LINE B it is a call to our service that processes the messages we receive. Now the problem is that we get an exception if our message processing takes more than 30 seconds.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("auto.offset.reset", "earliest"); props.put("heartbeat.interval.ms", "1000"); props.put("receive.buffer.bytes", 10485760); props.put("fetch.message.max.bytes", 5242880); props.put("enable.auto.commit", false); //with partition assigned to consumer KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props); // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0); //consumer.assign(Arrays.asList(partition0)); //assign topic to consumer without partition //LINE A consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp()); List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { try { ConsumerRecords<Object, Object> records = consumer.poll(1000); consumeFromQueue(records);//LINE B consumer.commitSync(); } catch (CommitFailedException e) { e.printStackTrace(); System.out.println("CommitFailedException"); } catch (Exception e) { e.printStackTrace(); System.out.println("Exception in while consuming messages"); }
The exception is
2016-03-03 10: 47: 35.095 INFO 6448 --- [ask-scheduler-3] oakccinternals.AbstractCoordinator: Mark coordinator 2147483647 dead. 2016-03-03 10: 47: 35.096 ERROR 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator: ILLEGAL_GENERATION error occurred while committing offsets for TEST-GROUP group CommitFailedException org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalancing at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.javacla5252) at org.apache.kafka.clients.consumer.internals.ConsumerCoumer OffsetCommitResponseHandler.handle (ConsumerCoordinator.javarige93) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:665) at org.apache.kafka.clientsoords.surator CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:644) at org.apache.kafka.cli ents.consumer.internals.RequestFuture $ 1.onSuccess (RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess (RequestFuture.java:133) at org.apache.kafka.clients.consumer .internals.RequestFuture.complete (RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.javahaps80) at org.apache.kafka.clientsNetwork (NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll (ConsumerNetworkClient.javahaps20) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java : 213)
The above exception occurs when committing an offset. Any suggestions will help you thank you
source share