Elasticsearch / Spark serialization does not look good with nested types.
For instance:
public class Foo implements Serializable { private List<Bar> bars = new ArrayList<Bar>();
The above code works, and documents like Foo will be indexed in Elasticsearch.
The problem occurs when the bars list in the Foo object is not empty, for example:
Foo = new Foo(); Bar = new Foo.Bar(); foo.getBars().add(bar);
Then, when indexing in Elasticsearch, the following exception is thrown:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [Bar] within type [class Foo], instance [Bar ...]] within instance [ Foo@1cf628a ] using writer [ org.elasticsearch.spark.serialization.ScalaValueWriter@4e635d ] at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:63) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58) at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
These are the corresponding Maven dependencies.
<dependency> <groupId>com.sksamuel.elastic4s</groupId> <artifactId>elastic4s_2.11</artifactId> <version>1.5.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop-cascading</artifactId> <version>2.1.0.Beta4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark_2.10</artifactId> <version>2.1.0.Beta4</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-xml</artifactId> <version>2.11.0-M4</version> </dependency>
What is the correct indexing method when using nested types with ElasticSearch and Spark?
thanks
source share