MongoDB incremental mapReduce, select only new documents added after the last mapReduce

Let's say I have a collection with documents that look like this (just a simplified example, but it should show a diagram):

> db.data.find() { "_id" : ObjectId("4e9c1f27aa3dd60ee98282cf"), "type" : "A", "value" : 11 } { "_id" : ObjectId("4e9c1f33aa3dd60ee98282d0"), "type" : "A", "value" : 58 } { "_id" : ObjectId("4e9c1f40aa3dd60ee98282d1"), "type" : "B", "value" : 37 } { "_id" : ObjectId("4e9c1f50aa3dd60ee98282d2"), "type" : "B", "value" : 1 } { "_id" : ObjectId("4e9c1f56aa3dd60ee98282d3"), "type" : "A", "value" : 85 } { "_id" : ObjectId("4e9c1f5daa3dd60ee98282d4"), "type" : "B", "value" : 12 } 

Now I need to collect some statistics on this collection. For example:

 db.data.mapReduce(function(){ emit(this.type,this.value); },function(key,values){ var total = 0; for(i in values) {total+=values[i]}; return total; }, {out:'stat'}) 

will collect totals in the stat collection.

 > db.stat.find() { "_id" : "A", "value" : 154 } { "_id" : "B", "value" : 50 } 

Everything is perfect at this point, but I'm stuck on the next move:

  • Collection
  • 'data' is constantly updated with new data (old documents remain unchanged, only inserted, not updated)
  • I would like to periodically update the "stat" collection, but I do not want to request the entire data collection every time, so I choose to run incremental mapReduce
  • It might seem that just updating the "stat" collection for each insert in the data collection and not using mapReduce, but the real case is more complicated than this example, and I would like to get statistics only on demand.
  • To do this, I should be able to request only documents that have been added since my last mapReduce
  • As far as I understand, I can not rely on the ObjectId property, just save the last one, and then select every document with ObjectId> that is stored, because ObjectId is not an autoincrement identifier in SQL databases (for example, different fragments will create different ObjectIds) .
  • I can change the ObjectId generator, but not sure how to do it better in a closed environment.

So the question is:

Is it possible to select only documents added after the last mapReduce to run incremental mapReduce, or can there be another strategy for updating statistics on an ever-growing collection?

+8
mongodb mapreduce
source share
4 answers

You can cache time and use it as a barrier for the next decrease in incremental map.

We are testing this at work and seem to be working. Correct me if I am wrong, but you can’t safely take a picture of the map while inserting through the fragments. The options become inconsistent and your map reduction operation fails. (If you find a solution to this, let me know! :)

Instead, we use bulk inserts once every 5 minutes. Once all the bulk inserts are done, we run map-reduce like this (in Python):

 m = Code(<map function>) r = Code(<reduce function>) # pseudo code end = last_time + 5 minutes # Use time and optionally any other keys you need here q = bson.SON([("date" : {"$gte" : last_time, "$lt" : end})]) collection.map_reduce(m, r, out=out={"reduce": <output_collection>}, query=q) 

Note that we used reduce , not merge , because we do not want to redefine what we had before; we want to combine old results and a new result with the same reduction function.

+4
source share

You can only get the temporary part of the identifier using _id.getTime() (from: http://api.mongodb.org/java/2.6/org/bson/types/ObjectId.html ). This should be sorted by all shards.

EDIT: Sorry, these were java documents ... The JS version looks like _id.generation_time.in_time_zone (Time.zone), from http://mongotips.com/b/a-few-objectid-tricks/

+4
source share

I wrote a complete pymongo-based solution that uses incremental map scaling and caches time and awaits execution in a cron job. It is blocked, so two cannot work at the same time:

https://gist.github.com/2233072

 """ This method performs an incremental map-reduce on any new data in 'source_table_name' into 'target_table_name'. It can be run in a cron job, for instance, and on each execution will process only the new, unprocessed records. The set of data to be processed incrementally is determined non-invasively (meaning the source table is not written to) by using the queued_date field 'source_queued_date_field_name'. When a record is ready to be processed, simply set its queued_date (which should be indexed for efficiency). When incremental_map_reduce() is run, any documents with queued_dates between the counter in 'counter_key' and 'max_datetime' will be map/reduced. If reset is True, it will drop 'target_table_name' before starting. If max_datetime is given, it will only process records up to that date. If limit_items is given, it will only process (roughly) that many items. If multiple items share the same date stamp (as specified in 'source_queued_date_field_name') then it has to fetch all of those or it'll lose track, so it includes them all. If unspecified/None, counter_key defaults to counter_table_name:LastMaxDatetime. """ 
+2
source share

We will solve this problem using the "normalized" ObjectIds. The steps we take:

  • normalize id - accept a timestap from the current / saved / last processed id and set the other part of the identifier to its min values. C # code: new ObjectId(objectId.Timestamp, 0, short.MinValue, 0)
  • run map-reduce with all elements having an id greater than our normalized identifier, skip already processed elements.
  • save the last processed identifier and mark all processed elements.

Note. Some boundary elements will be processed several times. To fix this, we set some kind of flag in the processed elements.

0
source share

All Articles