Serialization Spark Exclusion

I encounter a very strange Spark serialization issue. The code is as follows:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable { def infer(document: RDD[Document]): RDD[DocumentParameter] = { val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) docs } } 

where Document is defined as:

 class Document(val tokens: SparseVector[Int]) extends Serializable 

and DocumentParameter:

 class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable object DocumentParameter extends Serializable { def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, Array.ofDim[Float](numOfTopics)) } 

SparseVectoris - a serializable class in breeze.linalg.SparseVector .

This is a simple mapping procedure, and all classes are serializable, but I get this exception:

 org.apache.spark.SparkException: Task not serializable 

But when I numOfTopics parameter, that is:

 object DocumentParameter extends Serializable { def apply(document: Document) = new DocumentParameter(document, Array.ofDim[Float](10)) } 

and name it as follows:

 val docs = documents.map(DocumentParameter.apply) 

and it seems OK.

Is Int type not serializable? But I see that some code is written like this.

I am not sure how to fix this error.

#UPDATED #

Thanks @samthebest. I will add more information about this.

 stack trace: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at com.topicmodel.PLSA.infer(PLSA.scala:13) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37) at $iwC$$iwC$$iwC.<init>(<console>:39) at $iwC$$iwC.<init>(<console>:41) at $iwC.<init>(<console>:43) at <init>(<console>:45) at .<init>(<console>:49) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 46 more 

Since the stack trace gives general information about the exception, I deleted it.

I run the code in a spark shell.

 // suppose I have get RDD[Document] for docs val numOfTopics = 100 val plsa = new PLSA(sc, numOfTopics) val docPara = plsa.infer(docs) 

Could you give me some lessons or tips on serializing?

+7
scala serializable apache-spark
source share
2 answers

Anonymous functions serialize their class. When you map {doc => DocumentParameter(doc, numOfTopics)} , the only way it can grant this function access to numOfTopics is to serialize the PLSA class. And this class cannot actually be serialized, because (as you can see from stacktrace) it contains a SparkContext , which is not serializable (bad things will happen if individual cluster nodes have access to the context and can, for example, create new tasks from the cartographer )

In general, try to avoid saving SparkContext in your classes (edit or at least make sure that it shows very clearly which classes contain SparkContext and which view is not); it is better to pass it as a parameter (possibly implicit ) for individual methods that need it. Alternatively, move the {doc => DocumentParameter(doc, numOfTopics)} to another class from PLSA that can actually be serialized.

(As several people suggested, it’s possible to keep the SparkContext in the class, but marked as @transient so that it is not serialized. I do not recommend this approach, it means the class will be "(lose SparkContext ), and therefore you can get NPE when you try to access SparkContext from a serialized job, it’s better to maintain a clear distinction between classes that are used only in control code (and can use SparkContext ) and classes that are serialized to work in a cluster (which should not have SparkContext ).

+10
source share

This is really strange, but I think I can guess the problem. But firstly, you did not provide a minimum minimum for solving the problem (I can guess because I saw 100 of them before). Here are some issues with your question:

 def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = { val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) } 

This method does not return RDD[DocumentParameter] , it returns Unit . You must copy and paste the code incorrectly.

Secondly, you did not specify all stack trace? What for? There is no reason NOT to provide a full stack trace, and to understand the error, a full stack trace with a message is needed - all the error is needed to understand what the error is. Usually a non-serializable exception tells you what is not serializable.

Thirdly, you did not tell us where the infer method infer , do you do it in a shell? What is a containing object / class / feature, etc.? infer ?

In any case, I think that going through Int , you invoke a chain of things to get serialization that you don't expect, I cannot provide you with more information than this until you provide the minimum code so that we can fully understand your problem.

0
source share

All Articles