The corrected Spark variable returns a NullPointerException when starting in an Amazon EMR cluster

The variables that I use through the broadcast are null in the cluster.

My application is quite complicated, but I wrote this small example that works flawlessly when I run it locally, but it does not work in the cluster:

package com.gonzalopezzi.bigdata.bicing import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object PruebaBroadcast2 extends App { val conf = new SparkConf().setAppName("PruebaBroadcast2") val sc = new SparkContext(conf) val arr : Array[Int] = (6 to 9).toArray val broadcasted = sc.broadcast(arr) val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) // NullPointerException in the flatmap. broadcasted is null } 

I do not know if the problem is a coding error or a configuration problem.

This is the stack I get:

 15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24) at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Command exiting with ret '1' 

Can someone help me fix this? At least can you tell me if you see anything strange in the code? If you think that the code is ok, let me know, as this will mean that the problem is related to the cluster configuration.

Thanks in advance.

+5
source share
1 answer

Finally, I started to work.

Object declaration does not work as follows:

 object MyObject extends App { 

But it works if you declare an object with the main function:

 object MyObject { def main (args : Array[String]) { /* ... */ } } 

So, a short example in the question works if I rewrote it as follows:

 object PruebaBroadcast2 { def main (args: Array[String]) { val conf = new SparkConf().setAppName("PruebaBroadcast2") val sc = new SparkContext(conf) val arr : Array[Int] = (6 to 9).toArray val broadcasted = sc.broadcast(arr) val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) } } 

This problem seems to be related to this error: https://issues.apache.org/jira/browse/SPARK-4170

+5
source

All Articles