Spark Streaming: Broadcast Variables, java.lang.ClassCastException


I am trying to read data from a static text file stored in HDFS, store its contents in ArrayBuffer, which, in turn, should be broadcast via sparkContext.broadcast as BroadcastVariable. I am using cloudera spark, spark version 1.6.0-cdh5.7.0 and spark-streaming_2.10 .

I run the yarn application using spark-submit:

  spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst = true current.jar 

When I do this, I get java.lang.ClassCastException: cannot assign a scala instance. Some org.apache.spark.Accumulable.name field of type scala.Option in an instance of org.apache.spark.Accumulator The same code that is used with hard-coded ArrayBuffer works fine, so I assume it has something to do with to a static file resource ... Does anyone have an idea what I'm possibly doing wrong? Any help appreciated.

This does not work:

     object BroadcastStreamTest1 {def main (args: Array [String]) {val sparkConf = new SparkConf () val streamingContext = new StreamingContext (sparkConf, batchDuration = Seconds (10)) val content = streamingContext.sparkContext .textFile ("hdfs: // /data/someTextFile.txt ") .collect () .toBuffer [String] val broadCastVar = streamingContext.sparkContext.broadcast (content) broadCastVar.value.foreach (line => println (line)) streamingContext.start () streamingContext.awaitTermination ()}} 

It works:

     object BroadcastStreamTest2 {

         def main (args: Array [String]) {
             val sparkConf = new SparkConf ()
             val streamingContext = new StreamingContext (sparkConf, batchDuration = Seconds (10))

             val content = new mutable.ArrayBuffer [String]
             (1 to 50) .foreach (i => content + = "line" + i)

             val broadCastVar = streamingContext.sparkContext.broadcast (content)
             broadCastVar.value.foreach (line => println (line))

             streamingContext.start ()
             streamingContext.awaitTermination ()
         }
     }

Stacktrace:

  04/16/25 10:09:59 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times;  aborting job
 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, n525.hadoop.mxint. net): java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
         at org.apache.spark.util.Utils $ .tryOrIOException (Utils.scala: 1208)
         at org.apache.spark.Accumulable.readObject (Accumulators.scala: 151)
         at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke (Method.java:606)
         at java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1017)
         at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1893)
         at java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1798)
         at java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1350)
         at java.io.ObjectInputStream.readObject (ObjectInputStream.javahaps70)
         at scala.collection.immutable. $ colon $ colon.readObject (List.scala: 362)
         at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke (Method.java:606)
         at java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1017)
         at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1893)
         at java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1798)
         at java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1350)
         at java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:1990)
         at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1915)
         at java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1798)
         at java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1350)
         at java.io.ObjectInputStream.readObject (ObjectInputStream.javahaps70)
         at org.apache.spark.serializer.JavaDeserializationStream.readObject (JavaSerializer.scala: 76)
         at org.apache.spark.serializer.JavaSerializerInstance.deserialize (JavaSerializer.scala: 115)
         at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 194)
         at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
         at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
         at java.lang.Thread.run (Thread.java:745)
 Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
         at java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues ​​(ObjectStreamClass.java:2083)
         at java.io.ObjectStreamClass.setObjFieldValues ​​(ObjectStreamClass.java:1261)
         at java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:1996)
         at java.io.ObjectInputStream.defaultReadObject (ObjectInputStream.javaPoint00)
         at org.apache.spark.Accumulable $$ anonfun $ readObject $ 1.apply $ mcV $ sp (Accumulators.scala: 152)
         at org.apache.spark.util.Utils $ .tryOrIOException (Utils.scala: 1205)
         ... 30 more

 Driver stacktrace:
         at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1431)
         at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1419)
         at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1418)
         at scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59)
         at scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 47)
         at org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1418)
         at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 799)
         at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 799)
         at scala.Option.foreach (Option.scala: 236)
         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 799)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala: 1640)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1599)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1588)
         at org.apache.spark.util.EventLoop $$ anon $ 1.run (EventLoop.scala: 48)
         at org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 620)
         at org.apache.spark.SparkContext.runJob (SparkContext.scala: 1843)
         at org.apache.spark.SparkContext.runJob (SparkContext.scala: 1856)
         at org.apache.spark.SparkContext.runJob (SparkContext.scala: 1869)
         at org.apache.spark.SparkContext.runJob (SparkContext.scala: 1940)
         at org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply (RDD.scala: 927)
         at org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 150)
         at org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 111)
         at org.apache.spark.rdd.RDD.withScope (RDD.scala: 316)
         at org.apache.spark.rdd.RDD.collect (RDD.scala: 926)
         at net.meetrics.dada.streaming.application.BroadcastStreamTest1 $ .main (BroadcastStreamTest1.scala: 14)
         at net.meetrics.dada.streaming.application.BroadcastStreamTest1.main (BroadcastStreamTest1.scala)
         at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke (Method.java:606)
         at org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain (SparkSubmit.scala: 731)
         at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala: 181)
         at org.apache.spark.deploy.SparkSubmit $ .submit (SparkSubmit.scala: 206)
         at org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 121)
         at org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)
 Caused by: java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
         at org.apache.spark.util.Utils $ .tryOrIOException (Utils.scala: 1208)
         at org.apache.spark.Accumulable.readObject (Accumulators.scala: 151)
         at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke (Method.java:606)
         at java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1017)
         at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1893)
         at java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1798)
         at java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1350)
         at java.io.ObjectInputStream.readObject (ObjectInputStream.javahaps70)
         at scala.collection.immutable. $ colon $ colon.readObject (List.scala: 362)
         at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke (Method.java:606)
         at java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1017)
         at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1893)
         at java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1798)
         at java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1350)
         at java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:1990)
         at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1915)
         at java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1798)
         at java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1350)
         at java.io.ObjectInputStream.readObject (ObjectInputStream.javahaps70)
         at org.apache.spark.serializer.JavaDeserializationStream.readObject (JavaSerializer.scala: 76)
         at org.apache.spark.serializer.JavaSerializerInstance.deserialize (JavaSerializer.scala: 115)
         at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 194)
         at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
         at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
         at java.lang.Thread.run (Thread.java:745)
 Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
         at java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues ​​(ObjectStreamClass.java:2083)
         at java.io.ObjectStreamClass.setObjFieldValues ​​(ObjectStreamClass.java:1261)
         at java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:1996)
         at java.io.ObjectInputStream.defaultReadObject (ObjectInputStream.javaPoint00)
         at org.apache.spark.Accumulable $$ anonfun $ readObject $ 1.apply $ mcV $ sp (Accumulators.scala: 152)
         at org.apache.spark.util.Utils $ .tryOrIOException (Utils.scala: 1205)
         ... 30 more 
+7
scala hdfs broadcast apache-spark spark-streaming
source share
1 answer

The reason was some kind of conflict with my provided jar file.

No setting

spark.executor.userClassPathFirst=true 

it works, unfortunately, I could not find the exact cause of the problem.

+1
source share

All Articles