ElasticSearch for spark RDD

I tested the integration of ElasticSearch and Spark on my local machine using some test data loaded into elasticsearch.

val sparkConf = new SparkConf().setAppName("Test").setMaster("local") val sc = new SparkContext(sparkConf) val conf = new JobConf() conf.set("spark.serializer", classOf[KryoSerializer].getName) conf.set("es.nodes", "localhost:9200") conf.set("es.resource", "bank/account") conf.set("es.query", "?q=firstname:Daniel") val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable]) esRDD.first() esRDD.collect() 

The code works fine and returns the correct result with esRDD.first ()

However, esRDD.collect () throws an exception:

 java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

I believe this is due to the problem mentioned here http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html so I added this line accordingly

 conf.set("spark.serializer", classOf[KryoSerializer].getName) 

Should I do something else to make it work? Thanks you


Update: The issue of installing serialization has been resolved. using

 sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) 

instead

 conf.set("spark.serializer", classOf[KryoSerializer].getName) 

Now there is one more. There are 1000 different records in this dataset.

 esRDD.count() 

returns 1000 no problem however

 esRDD.distinct().count() 

returns 5! If I print the notes

 esRDD.foreach(println) 

It prints 1000 entries correctly. But if I use a collection or take

 esRDD.collect().foreach(println) esRDD.take(10).foreach(println) 

it will print DUPLICATED records, and indeed there are only 5 UNIQUE records, which appear to be a random subset of the entire data set - these are not the first 5 records. If I save RDD and read it

 esRDD.saveAsTextFile("spark-output") val esRDD2 = sc.textFile("spark-output") esRDD2.distinct().count() esRDD2.collect().foreach(println) esRDD2.take(10).foreach(println) 

esRDD2 behaves as expected. I wonder if there is a bug or something that I do not understand about the behavior of collect / take. Or is it because I run everything locally. By default, Spark RDD seems to use 5 partitions, as shown in the number of part-xxxx files in the spark output file. It is likely why esRDD.collect () and esRDD.distinct () returned 5 unique entries instead of some other random number. But this is still not the case.

+8
serialization elasticsearch apache-spark elasticsearch-hadoop
source share
2 answers

The following codes should be used for initialization:

 val sparkConf = new SparkConf().setAppName("Test").setMaster("local").set("spark.serializer", classOf[KryoSerializer].getName) val sc = new SparkContext(sparkConf) val conf = new JobConf() conf.set("es.nodes", "localhost:9200") conf.set("es.resource", "bank/account") conf.set("es.query", "?q=firstname:Daniel") 
+1
source share

you can try

 val spark = new SparkConf() .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .set("es.nodes",localhost) .set("es.port","9200") .appName("ES") .master("local[*]") val data = spark.read .format("org.elasticsearch.spark.sql") .option("es.query", "?q=firstname:Daniel")") .load("bank/account").rdd data.first() data.collect() 
0
source share

All Articles