Could not find leaders for Set ([TOPICNNAME, 0])) When we use Apache Spark

We use Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and the Kafka DirectStream API to retrieve data from Kafka using Spark.

We created themes in Kafka with the following settings

ReplicationFactor: 1 and Replica: 1

When all Kafka instances are running, Spark is working fine. However, when one of the Kafka instances in the cluster is disconnected, we get the exception reproduced below. After a while, we restarted the disconnected Kafka instance and tried to shut down Spark, but Spark was already completed due to an exception. Because of this, we could not read the remaining posts in Kafka topics.

ERROR DirectKafkaInputDStream:125 - ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0])) ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Thanks in advance. Please help solve this problem.

+4
apache-spark apache-kafka spark-streaming
source share
2 answers

This is the expected behavior. You requested to save each topic on one machine by setting ReplicationFactor to one. When a single machine is removed that stores the normalized-tenant4 theme, the consumer cannot find the topic leader.

See http://kafka.apache.org/documentation.html#intro_guarantees .

+4
source share

One of the reasons for this type of error when a leader cannot be found for this issue is the problem with one configuration of the Kafka server.

Open the Kafka server configuration:

 vim ./kafka/kafka-<your-version>/config/server.properties 

In the "Socket Server Settings" section, specify the IP address of your host, if it is missing:

 listeners=PLAINTEXT://{host-ip}:{host-port} 

I used the Kafka setting, provided with the MapR sandbox, and tried to access the kafka using the spark code. I was getting the same error while accessing my kafka, since my configuration was missing an IP address.

+1
source share

All Articles