I encounter a very strange Spark serialization issue. The code is as follows:
class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable { def infer(document: RDD[Document]): RDD[DocumentParameter] = { val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) docs } }
where Document is defined as:
class Document(val tokens: SparseVector[Int]) extends Serializable
and DocumentParameter:
class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable object DocumentParameter extends Serializable { def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, Array.ofDim[Float](numOfTopics)) }
SparseVectoris - a serializable class in breeze.linalg.SparseVector .
This is a simple mapping procedure, and all classes are serializable, but I get this exception:
org.apache.spark.SparkException: Task not serializable
But when I numOfTopics parameter, that is:
object DocumentParameter extends Serializable { def apply(document: Document) = new DocumentParameter(document, Array.ofDim[Float](10)) }
and name it as follows:
val docs = documents.map(DocumentParameter.apply)
and it seems OK.
Is Int type not serializable? But I see that some code is written like this.
I am not sure how to fix this error.
#UPDATED #
Thanks @samthebest. I will add more information about this.
stack trace: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at com.topicmodel.PLSA.infer(PLSA.scala:13) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37) at $iwC$$iwC$$iwC.<init>(<console>:39) at $iwC$$iwC.<init>(<console>:41) at $iwC.<init>(<console>:43) at <init>(<console>:45) at .<init>(<console>:49) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 46 more
Since the stack trace gives general information about the exception, I deleted it.
I run the code in a spark shell.
// suppose I have get RDD[Document] for docs val numOfTopics = 100 val plsa = new PLSA(sc, numOfTopics) val docPara = plsa.infer(docs)
Could you give me some lessons or tips on serializing?