Java + spark: org.apache.spark.SparkException: Job aborted: The task is not serializable: java.io.NotSerializableException

I am new to spark and tried to run the JavaSparkPi.java example, it works well, but since I have to use this in another java, I copy all things from main to the method in the class and try to call the method basically, he said

org.apache.spark.SparkException: Job aborted: The task cannot be serialized: java.io.NotSerializableException

The code is as follows:

public class JavaSparkPi { public void cal(){ JavaSparkContext jsc = new JavaSparkContext("local", "JavaLogQuery"); int slices = 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); for (int i = 0; i < n; i++) { l.add(i); } JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); System.out.println("count is: "+ dataSet.count()); dataSet.foreach(new VoidFunction<Integer>(){ public void call(Integer i){ System.out.println(i); } }); int count = dataSet.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer) throws Exception { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); System.out.println("Pi is roughly " + 4.0 * count / n); } public static void main(String[] args) throws Exception { JavaSparkPi myClass = new JavaSparkPi(); myClass.cal(); } } 

Anyone have an idea on this? thanks!

+9
java serialization apache-spark
source share
3 answers

Nested functions contain a reference to the containing object ( JavaSparkPi ). Thus, this object will be serialized. For this to work, it must be serializable. Plain:

 public class JavaSparkPi implements Serializable { ... 
+14
source share

The main problem is that when creating an anonymous class in java, the reference to the surrounding class is passed. This can be fixed in many ways.

Declare a private class Serializable

This works in your case, but will fall if your enclosing class has some kind of field that is not serializable. I would also say that serializing a parent class is a common departure.

Create closure in static function

Creating a closure by calling some static function does not pass a reference to the closure and therefore does not need to be serialized in this way.

+2
source share

This error occurs because you have multiple physical processors in your local or cluster processor, and the startup mechanism attempts to send this function to multiple processors over the network. Your function

  dataSet.foreach(new VoidFunction<Integer>(){ public void call(Integer i){ ***System.out.println(i);*** } }); 

uses println (), which is not serializable. So the exception throws the Spark Engine. You can use the solution below:

 dataSet.collect().forEach(new VoidFunction<Integer>(){ public void call(Integer i){ System.out.println(i); } }); 
0
source share

All Articles