Elasticsearch - Spark serialization does not work with inner classes

Elasticsearch / Spark serialization does not look good with nested types.

For instance:

public class Foo implements Serializable { private List<Bar> bars = new ArrayList<Bar>(); // getters and setters public static class Bar implements Serializable { } } List<Foo> foos = new ArrayList<Foo>(); foos.add( new Foo()); // Note: Foo object does not contain nested Bar instances SparkConf sc = new SparkConf(); // sc.setMaster("local"); sc.setAppName("spark.app.name"); sc.set("spark.serializer", KryoSerializer.class.getName()); JavaSparkContext jsc = new JavaSparkContext(sc); JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(foos)); JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME); 

The above code works, and documents like Foo will be indexed in Elasticsearch.

The problem occurs when the bars list in the Foo object is not empty, for example:

 Foo = new Foo(); Bar = new Foo.Bar(); foo.getBars().add(bar); 

Then, when indexing in Elasticsearch, the following exception is thrown:

 org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [Bar] within type [class Foo], instance [Bar ...]] within instance [ Foo@1cf628a ] using writer [ org.elasticsearch.spark.serialization.ScalaValueWriter@4e635d ] at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:63) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58) at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 

These are the corresponding Maven dependencies.

 <dependency> <groupId>com.sksamuel.elastic4s</groupId> <artifactId>elastic4s_2.11</artifactId> <version>1.5.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop-cascading</artifactId> <version>2.1.0.Beta4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark_2.10</artifactId> <version>2.1.0.Beta4</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-xml</artifactId> <version>2.11.0-M4</version> </dependency> 

What is the correct indexing method when using nested types with ElasticSearch and Spark?

thanks

+5
source share
4 answers

The solution could be to build json from the object you are trying to save using, for example, Json4s . In this case, the RDD "JavaEsSpark" will be the RDD string. Then you just need to call

JavaEsSpark.saveJsonToEs...

instead

JavaEsSpark.saveToEs...

This workaround helped me save countless hours trying to figure out how to serialize nested cards.

+3
source

If you look at the code of ScalaValueWriter and JdkValueWriter, we will see that only certain types are supported. Most likely, the inner class is not a JavaBean or other supported type.

0
source

One fine day, ScalaValueWriter and JdkValueWriter may support user-defined types (like Bar in our example) other than Java types like String, int, etc.

Meanwhile, the following workaround exists. Instead of Foo to list Bar objects, internally convert List to Map<String, Object> and publish it.

Something like that:

 private List<Map<String, Object>> bars= new ArrayList<Map<String, Object>>(); public List<Map<String, Object>> getBars() { return bars; } public void setBars(List<Bar> bars) { for (Bar bar: bars){ this.bars.add(bar.getAsMap()); } } 
0
source

I suggest working with com.google.gson.Gson;

String foosJson = new Gson (). ToJson (foos); then Map map = new HashMap <> (); ......
JavaRDD> javaRDD = sc.parallelize (ImmutableList.of (map)); JavaEsSpark.saveToEs (javaRDD, INDEX_NAME + "/" + TYPE_NAME);

0
source

All Articles