MapReduce for several types of data stores in the Google App Engine

I just looked at Batch Data Processing with an App Engine session in Google I / O 2010 , read some parts of MapReduce from Google Research , and now I’m thinking of using MapReduce in the Google App Engine to implement the recommendation system in Python.

I prefer to use appengine-mapreduce instead of the task queue API, because the first one offers easy iteration in all instances of some type, automatic batching, automatic task chain, etc. The problem is that my recommender system must calculate the correlation between the instances of two different Models, i.e. copies of two different species.

Example: I have two models: user and element. Each of them has a list of tags as an attribute. Below are the functions for calculating the correlation between users and elements. Note that calculateCorrelation should be called for each combination of users and elements:

 def calculateCorrelation(user, item): return calculateCorrelationAverage(u.tags, i.tags) def calculateCorrelationAverage(tags1, tags2): correlationSum = 0.0 for (tag1, tag2) in allCombinations(tags1, tags2): correlationSum += correlation(tag1, tag2) return correlationSum / (len(tags1) + len(tags2)) def allCombinations(list1, list2): combinations = [] for x in list1: for y in list2: combinations.append((x, y)) return combinations 

But this calculateCorrelation not a valid Mapper in appengine-mapreduce, and perhaps this function is not even compatible with the MapReduce calculation concept. However, I need to be sure ... it would be really beneficial for me to take advantage of automatic dosing and a task chain.

Is there any solution for this?

Should I define my own InputReader? Is the new InputReader that reads all instances of two different types compatible with the current appengine-mapreduce implementation?

Or should I try the following:

  • Combine all the keys of all objects of these two types, two by two, into instances of the new model (possibly using MapReduce)
  • Iteration using instance mappings of this new model
  • For each instance, use the keys inside it to get two entities of different types and calculate the correlation between them.
+6
python google-app-engine mapreduce google-cloud-datastore task-queue
source share
2 answers

Following Nick Johnson's advice, I wrote my own InputReader. This reader receives objects from two different types. It gives tuples with all combinations of these objects. Here he is:

 class TwoKindsInputReader(InputReader): _APP_PARAM = "_app" _KIND1_PARAM = "kind1" _KIND2_PARAM = "kind2" MAPPER_PARAMS = "mapper_params" def __init__(self, reader1, reader2): self._reader1 = reader1 self._reader2 = reader2 def __iter__(self): for u in self._reader1: for e in self._reader2: yield (u, e) @classmethod def from_json(cls, input_shard_state): reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM]) reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM]) return cls(reader1, reader2) def to_json(self): json_dict = {} json_dict[self._KIND1_PARAM] = self._reader1.to_json() json_dict[self._KIND2_PARAM] = self._reader2.to_json() return json_dict @classmethod def split_input(cls, mapper_spec): params = mapper_spec.params app = params.get(cls._APP_PARAM) kind1 = params.get(cls._KIND1_PARAM) kind2 = params.get(cls._KIND2_PARAM) shard_count = mapper_spec.shard_count shard_count_sqrt = int(math.sqrt(shard_count)) splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt) splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt) inputs = [] for u in splitted1: for e in splitted2: inputs.append(TwoKindsInputReader(u, e)) #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py) return inputs @classmethod def validate(cls, mapper_spec): return True #TODO 

This code should be used when you need to process all combinations of objects of two types. You can also generalize this to more than two kinds.

Here is a valid mapreduce.yaml for TwoKindsInputReader :

 mapreduce: - name: recommendationMapReduce mapper: input_reader: customInputReaders.TwoKindsInputReader handler: recommendation.calculateCorrelationHandler params: - name: kind1 default: kinds.User - name: kind2 default: kinds.Item - name: shard_count default: 16 
+3
source share

It is hard to understand what to recommend, in no more detail about what you are actually counting. One simple option - just get the linked object inside the map call - there is nothing stopping you from performing data warehouse operations.

This will result in many small calls. Writing a custom InputReader, as you suggest, will allow you to simultaneously select both sets of objects, which will greatly improve performance.

If you give more detailed information on how you need to join these entities, we can provide more specific suggestions.

+2
source share

All Articles