Spark Exclusion Exclusion

In my Spark code, I'm trying to create an IndexedRowMatrix from a csv file. However, I get the following error:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext

Here is my code:

sc = new JavaSparkContext("local", "App",
              "/srv/spark", new String[]{"target/App.jar"});

JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();


JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
              new  Function<scala.Tuple2<String, Long>, IndexedRow>() {
                /**
                 * 
                **/ 
                private static final long serialVersionUID = 4795273163954440089L;

                @Override
                public IndexedRow call(Tuple2<String, Long> tuple)
                        throws Exception {
                    String line = tuple._1;
                    long index = tuple._2;
                    String[] strings = line.split(",");
                    double[] doubles = new double[strings.length];
                     for (int i = 0; i < strings.length; i++) {
                         doubles[i] = Double.parseDouble(strings[i]);
                     }
                     Vector v = new DenseVector(doubles);
                     return new IndexedRow(index, v);
                }
            });
+4
source share
6 answers

I had the same problem. It made me twist. This is a Java limitation for anonymous instances and Serializability. My solution was to declare an anonymous function instance as a named static class that implements Serializable and creates it. I basically declared a function library, which was an outer class that included static definitions of the inner class of functions that I wanted to use.

, Scala, , , , , .

+3

, : -Dsun.io.serialization.extendedDebugInfo=true , .

, . A JavaSparkContext ( , ). , , Function , , . , , , , JavaSparkContext, , . , , JavaSparkContext , .

, , JavaSparkContext , - ( ), . , , JavaSparkContext :

public static void main(String[] args) {
   JavaSparkContext sc = new JavaSparkContext();

   // do whatever you need to do, if you need sc inside other classes,
   // store this sc into a static class, say Registry.set(sc) and Registry.getJSC()

   JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache();
   JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
          new  Function<scala.Tuple2<String, Long>, IndexedRow>() {
            private static final long serialVersionUID = 4795273163954440089L; // won't be serialized

            @Override
            public IndexedRow call(Tuple2<String, Long> tuple)
                    throws Exception {
                String line = tuple._1;
                long index = tuple._2;
                String[] strings = line.split(",");
                double[] doubles = new double[strings.length];
                 for (int i = 0; i < strings.length; i++) {
                     doubles[i] = Double.parseDouble(strings[i]);
                 }
                 Vector v = new DenseVector(doubles);
                 return new IndexedRow(index, v);
            }
        });
}

, , , , serialVersionUID ( - ).

+2

- , , , .

, , mapper:

public class Mapper implements Function<Tuple2<String,Long>, IndexedRow> {

  @Override
  public IndexedRow call(Tuple2<String, Long> tuple) throws Exception {
    String line = tuple._1();
    long index = tuple._2();
    String[] strings = line.split(",");
    double[] doubles = new double[strings.length];
    for (int i = 0; i < strings.length; i++) {
      doubles[i] = Double.parseDouble(strings[i]);
    }
    Vector v = new DenseVector(doubles);
    return new IndexedRow(index, v);
  }
}

JavaRDD:

JavaRDD<String> csv = jsc.textFile("data/matrix.csv").cache();
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(new Mapper());

, map() Spark Mapper, .

, , , .

+1

, RDD, . , :

  • , , , (map).
  • forEachPartition Spark . , RDD, .
0

In the general case, Rdd objects will be serialized using a spark when tasks are transferred to various executors. But you must use closure to avoid this error.

You can use Rdd.mapPartition () to process each section and place the code in it. Thus, the spark itself will take care of the serialization and deserialization of map objects.

0
source

Create a separate class for your mapper and implement Srielizable, sometimes inner classes cause compilation problems in a spark environment.

0
source

All Articles