Update your collection in MongoDb via Apache Spark using the Mongo-Hadoop Connector

I would like to update a specific collection in MongoDb through Spark in Java. I am using MongoDB Connector for Hadoop to retrieve and save information from Apache Spark in MongoDb in Java.

After Sampo Niskanen posted an excellent post about finding and saving collections in MongoDb through Spark, I was stuck in updating collections.

MongoOutputFormat.java includes a constructor containing String [] updateKeys, which, I assume, refers to a possible list of keys to compare to existing collections and perform an update. However, using the Spark method saveAsNewApiHadoopFile()with a parameter MongoOutputFormat.class, I am wondering how to use this update constructor.

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

Prior to this, MongoUpdateWritable.java was used to perform collection updates. From the examples I saw on Hadoop, this usually installs on mongo.job.output.value, perhaps it looks like this in Spark:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);

However, I'm still wondering how to specify update keys in MongoUpdateWritable.java.

, "_id" KeyValue, _id.

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
    BSONObject o = (BSONObject) s._1;

    //for all keys, set _id to key:value_
    String id = "";
    for (String key : o.keySet()){
        id += key + ":" + (String) o.get(key) + "_";
    }
    o.put("_id", id);

    o.put("result", s._2);
    return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

mongodb Spark MongoOutputFormat MongoUpdateWritable Configuration, saveAsNewApiHadoopFile(). ? , - , _id , ?

+4
1

config.set("mongo.job.output.value","....")

.saveAsNewAPIHadoopFile(
        "file:///bogus",
        classOf[Any],
        classOf[Any],
        classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
        mongo_config
      )

.

, MongoUpdateWritable :

items.map(row => {
      val mongo_id = new ObjectId(row("id").toString)
      val query = new BasicBSONObject()
      query.append("_id", mongo_id)
      val update = new BasicBSONObject()

      update.append("$set", new BasicBSONObject().append("field_name", row("new_value")))
      val muw = new MongoUpdateWritable(query,update,false,true)
      (null, muw)
    })
     .saveAsNewAPIHadoopFile(
       "file:///bogus",
       classOf[Any],
       classOf[Any],
       classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
       mongo_config
     )

, mongo, :

2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms
+7

All Articles