I tested the integration of ElasticSearch and Spark on my local machine using some test data loaded into elasticsearch.
val sparkConf = new SparkConf().setAppName("Test").setMaster("local") val sc = new SparkContext(sparkConf) val conf = new JobConf() conf.set("spark.serializer", classOf[KryoSerializer].getName) conf.set("es.nodes", "localhost:9200") conf.set("es.resource", "bank/account") conf.set("es.query", "?q=firstname:Daniel") val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable]) esRDD.first() esRDD.collect()
The code works fine and returns the correct result with esRDD.first ()
However, esRDD.collect () throws an exception:
java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I believe this is due to the problem mentioned here http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html so I added this line accordingly
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Should I do something else to make it work? Thanks you
Update: The issue of installing serialization has been resolved. using
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
instead
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Now there is one more. There are 1000 different records in this dataset.
esRDD.count()
returns 1000 no problem however
esRDD.distinct().count()
returns 5! If I print the notes
esRDD.foreach(println)
It prints 1000 entries correctly. But if I use a collection or take
esRDD.collect().foreach(println) esRDD.take(10).foreach(println)
it will print DUPLICATED records, and indeed there are only 5 UNIQUE records, which appear to be a random subset of the entire data set - these are not the first 5 records. If I save RDD and read it
esRDD.saveAsTextFile("spark-output") val esRDD2 = sc.textFile("spark-output") esRDD2.distinct().count() esRDD2.collect().foreach(println) esRDD2.take(10).foreach(println)
esRDD2 behaves as expected. I wonder if there is a bug or something that I do not understand about the behavior of collect / take. Or is it because I run everything locally. By default, Spark RDD seems to use 5 partitions, as shown in the number of part-xxxx files in the spark output file. It is likely why esRDD.collect () and esRDD.distinct () returned 5 unique entries instead of some other random number. But this is still not the case.