What is the correct way to use Elasticsearch with Spark to update existing objects?
I wanted something like the following:
- Get existing data in the form of a map.
- Create a new map and fill it with updated fields.
- Save new card.
However, there are several problems:
- The list of returned fields cannot contain _id, since it is not part of the source .
If for testing I rigidly set _idthe new values existing in the map, the following exception is thrown:
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest
How to extract _idand how to return it back to Spark?
I included the following code below to better illustrate what I was trying to do:
JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME,
"?source=,field1,field2).values();
Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
Map<String, Object> map = iter.next();
// Get existing values, and do transformation logic
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put("_id", ??????);
newMap.put("field1", new_value);
listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME);
, .
- , Spark elasticsearch?