Elasticearch and Spark: updating existing facilities

What is the correct way to use Elasticsearch with Spark to update existing objects?

I wanted something like the following:

  • Get existing data in the form of a map.
  • Create a new map and fill it with updated fields.
  • Save new card.

However, there are several problems:

  • The list of returned fields cannot contain _id, since it is not part of the source .
  • If for testing I rigidly set _idthe new values existing in the map, the following exception is thrown:

    org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest

How to extract _idand how to return it back to Spark?

I included the following code below to better illustrate what I was trying to do:

JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME, 
"?source=,field1,field2).values();

Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
   Map<String, Object> map = iter.next();
   // Get existing values, and do transformation logic

   Map<String, Object> newMap = new HashMap<String, Object>();
   newMap.put("_id", ??????);
   newMap.put("field1", new_value);
   listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME); 

, .

- , Spark elasticsearch?

+5
3

upsert Spark:

.config("es.write.operation", "upsert")

0

Elasticsearch , _id, read metadata true:

 .config("es.read.metadata", "true")

, "_id" .

, :

 newMap.put("idfield", yourId);

, , :

 .config("es.mapping.id", "idfield")

, :

 .config("es.write.operation", "update")
0

This is how I did it (Scala / Spark 2.3 / Elastic-Hadoop v6.5).

To read (identifier or other metadata):

spark
    .read
    .format("org.elasticsearch.spark.sql")
    .option("es.read.metadata",true) // allow to read metadata
    .load("yourindex/yourtype")
    .select(col("_metadata._id").as("myId"),...)

To update specific columns in ES:

myDataFrame
    .select("myId","columnToUpdate")
    .saveToEs(
        "yourindex/yourtype",
        Map(
            "es.mapping.id" -> "myId",
            "es.write.operation" -> "update", // important to change operation to partial update
            "es.mapping.exclude" -> "myId"
        )
    )
0
source

All Articles