How to dynamically pass map function parameters to a GAE mapreduce?

I need to run the mapreduce task, which is dynamic, in the sense that the parameters must be passed to the map and reduce the functions every time the mapreduce task is executed (for example, in response to a user request).

How to do it? I have not seen anywhere in the documentation how to perform dynamic processing at runtime for a map and reduce it.

class MatchProcessing(webapp2.RequestHandler): def get(self): requestKeyID=int(self.request.get('riderbeeRequestID')) userKey=self.request.get('userKey') pipeline = MatchingPipeline(requestKeyID, userKey) pipeline.start() self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id) class MatchingPipeline(base_handler.PipelineBase): def run(self, requestKeyID, userKey): yield mapreduce_pipeline.MapreducePipeline( "riderbee_matching", "tasks.matchingMR.riderbee_map", "tasks.matchingMR.riderbee_reduce", "mapreduce.input_readers.DatastoreInputReader", "mapreduce.output_writers.BlobstoreOutputWriter", mapper_params={ "entity_kind": "models.rides.RiderbeeRequest", "requestKeyID": requestKeyID, "userKey": userKey, }, reducer_params={ "mime_type": "text/plain", }, shards=16) def riderbee_map(riderbeeRequest): # would like to access the requestKeyID and userKey parameters that were passed in mapper_params # so that we can do some processing based on that yield (riderbeeRequest.user.email, riderbeeRequest.key().id()) def riderbee_reduce(key, values): # would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params # so that we can do some processing based on that yield "%s: %s\n" % (key, len(values)) 

Help me please?

+7
source share
2 answers

I am sure that you can simply specify the parameters in mapper_parameters and read them from the context module. See http://code.google.com/p/appengine-mapreduce/wiki/UserGuidePython#Mapper_parameters for more details.

+5
source

Here's how to access the mapper parameters from the mapper function using the context module:

 from mapreduce import context def riderbee_map(riderbeeRequest): ctx = context.get() params = ctx.mapreduce_spec.mapper.params requestKeyID = params["requestKeyID"] 
+4
source

All Articles