When I try to use messages from the kafka server hosted in ec2 using the kafka console tool (V 0.9.0.1, I think it uses the old consumer APIs) I get the following exception. How can I overcome this?
kafka-console-consumer.sh --zookeeper zookeeper.xx.com:2181 --topic my-replicated-topic [2016-04-06 15:52:40,247] WARN [console-consumer-12572_Rathas-MacBook-Pro.local-1459921957380-6ebc238f-leader-finder-thread], Failed to add leader for partitions [my-replicated-topic,1],[my-replicated-topic,2],[my-replicated-topic,0]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:188) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:84) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:187) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:182) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:182) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:88) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) at scala.collection.immutable.Map$Map3.foreach(Map.scala:161) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) at
I created a theme like
/kafka-topics.sh --create --zookeeper zookeeper.xx.com:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic
apache-kafka kafka-consumer-api kafka-producer-api
Ratha
source share