It seems that FileInputReader can only FileInputReader files. The format parameter only changes the way the mapper function is called. If you transfer more than one file to the card, it will start working on several fragments. Otherwise, only one shard will be used for data processing.
EDIT # 1:
Delve deeper into the mapreduce library. MapReduce will decide whether to split the file into pieces based on the method of the can_split method for each specific file type. Currently, the only format that implements the split method is ZipFormat . So, if your file format is not zip, it will not split the file into more than one splinter.
@classmethod def can_split(cls): """Indicates whether this format support splitting within a file boundary. Returns: True if a FileFormat allows its inputs to be splitted into different shards. """
https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/file_formats.py
But it looks like you can write your own method of separating file formats. You can try hacking first and adding a split method on _TextFormat and see if more than one shard works.
@classmethod def split(cls, desired_size, start_index, opened_file, cache): pass
EDIT # 2:
An easy workaround would be to run the FileInputReader series FileInputReader series, but move the time task to a parallel reduce step.
def line_processor(line):
EDIT # 3:
If you try to change FileFormat , here is an example (the test has not passed yet)
from file_formats import _TextFormat, FORMATS class _LinesSplitFormat(_TextFormat): """Read file line by line.""" NAME = 'split_lines' def get_next(self): """Inherited.""" index = self.get_index() cache = self.get_cache() offset = sum(cache['infolist'][:index]) self.get_current_file.seek(offset) result = self.get_current_file().readline() if not result: raise EOFError() if 'encoding' in self._kwargs: result = result.encode(self._kwargs['encoding']) return result @classmethod def can_split(cls): """Inherited.""" return True @classmethod def split(cls, desired_size, start_index, opened_file, cache): """Inherited.""" if 'infolist' in cache: infolist = cache['infolist'] else: infolist = [] for i in opened_file: infolist.append(len(i)) cache['infolist'] = infolist index = start_index while desired_size > 0 and index < len(infolist): desired_size -= infolist[index] index += 1 return desired_size, index FORMATS['split_lines'] = _LinesSplitFormat
Then the new file format can be called using the mapper_parameters parameters from lines to split_line .
class LoadEntitiesPipeline(webapp2.RequestHandler): id = control.start_map(map_name, handler_spec="backend.line_processor", reader_spec="mapreduce.input_readers.FileInputReader", queue_name=get_queue_name("q-1"), shard_count=shard_count, mapper_parameters={ 'shard_count': shard_count, 'batch_size': 50, 'processing_rate': 1000000, 'files': [gsfile], 'format': 'split_lines'})