JavaSparkContext is not serializable

I use spark with cassandra and I hava JavaRDD<String> clients. And for each client I want to choose from cassandra its interaction, like this:

 avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() { @Override public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception { List<InteractionByMonthAndCustomer> b = javaFunctions(sc) .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer") .where("ctid =?", s) .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() { @Override public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception { return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"), cassandraRow.getString("motif"), cassandraRow.getDate("start"), cassandraRow.getDate("end"), cassandraRow.getString("ctid"), cassandraRow.getString("month") ); } }).collect(); return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b); } }); 

For this, I use one JavaSparkContext sc . But I got this error:

 Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99) at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32) at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91) at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 14 more 

I think JavaSparkContext should be serializable. But how can I make serializable, please?

Thanks.

+7
java serialization apache-spark
source share
3 answers

No, JavaSparkContext not serializable and should not be. It cannot be used in the function that you send to remote employees. You are not explicitly referencing it here, but the reference is still serialized, because your anonymous class internal function is not static and therefore has a reference to the enclosing class.

Try rewriting the code with this function as a stand-alone static object.

+12
source share

You cannot use SparkContext and create other RDDs from within the artist (RDD mapping function).

You need to create a Cassandra RDD (sc.cassandraTable) in the driver, and then make the connection between the two RDDs (client RDD and Cassandra RDD table).

0
source share

Declare it with the transient keyword:

 private transient JavaSparkContext sparkContext; 
0
source share

All Articles