How to get an AppEngine map to zoom out?

I wrote a simple MapReduce stream to read in lines from a CSV from a file in Google Cloud Storage, and then create an Entity. However, I cannot get it to work on a few shards .

The code uses mapreduce.control.start_map and looks something like this.

class LoadEntitiesPipeline(webapp2.RequestHandler): id = control.start_map(map_name, handler_spec="backend.line_processor", reader_spec="mapreduce.input_readers.FileInputReader", queue_name=get_queue_name("q-1"), shard_count=shard_count, mapper_parameters={ 'shard_count': shard_count, 'batch_size': 50, 'processing_rate': 1000000, 'files': [gsfile], 'format': 'lines'}) 

I have shard_count in both places because I'm not sure which methods he really needs. Setting shard_count anywhere from 8 to 32 does not change anything, since the status page always says that 1/1 fragments are working. To separate things, I did everything that was done in the backend queue with a large number of instances. I tried to configure the queue settings for this wiki . In the end, it looks like it starts up in serial.

Any ideas? Thanks!

Update (still unsuccessful):

While trying to isolate things, I tried to make a call using direct calls to the pipeline as follows:

 class ImportHandler(webapp2.RequestHandler): def get(self, gsfile): pipeline = LoadEntitiesPipeline2(gsfile) pipeline.start(queue_name=get_queue_name("q-1")) self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id) class LoadEntitiesPipeline2(base_handler.PipelineBase): def run(self, gsfile): yield mapreduce_pipeline.MapperPipeline( 'loadentities2_' + gsfile, 'backend.line_processor', 'mapreduce.input_readers.FileInputReader', params={'files': [gsfile], 'format': 'lines'}, shards=32 ) 

With this new code, it still only works on one shard. I'm starting to wonder if mapreduce.input_readers.FileInputReader is able to parallelize input line by line.

+7
python google-app-engine google-cloud-storage mapreduce
source share
3 answers

It seems that FileInputReader can only FileInputReader files. The format parameter only changes the way the mapper function is called. If you transfer more than one file to the card, it will start working on several fragments. Otherwise, only one shard will be used for data processing.

EDIT # 1:

Delve deeper into the mapreduce library. MapReduce will decide whether to split the file into pieces based on the method of the can_split method for each specific file type. Currently, the only format that implements the split method is ZipFormat . So, if your file format is not zip, it will not split the file into more than one splinter.

 @classmethod def can_split(cls): """Indicates whether this format support splitting within a file boundary. Returns: True if a FileFormat allows its inputs to be splitted into different shards. """ 

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/file_formats.py

But it looks like you can write your own method of separating file formats. You can try hacking first and adding a split method on _TextFormat and see if more than one shard works.

 @classmethod def split(cls, desired_size, start_index, opened_file, cache): pass 

EDIT # 2:

An easy workaround would be to run the FileInputReader series FileInputReader series, but move the time task to a parallel reduce step.

 def line_processor(line): # serial yield (random.randrange(1000), line) def reducer(key, values): # parallel entities = [] for v in values: entities.append(CREATE_ENTITY_FROM_VALUE(v)) db.put(entities) 

EDIT # 3:

If you try to change FileFormat , here is an example (the test has not passed yet)

 from file_formats import _TextFormat, FORMATS class _LinesSplitFormat(_TextFormat): """Read file line by line.""" NAME = 'split_lines' def get_next(self): """Inherited.""" index = self.get_index() cache = self.get_cache() offset = sum(cache['infolist'][:index]) self.get_current_file.seek(offset) result = self.get_current_file().readline() if not result: raise EOFError() if 'encoding' in self._kwargs: result = result.encode(self._kwargs['encoding']) return result @classmethod def can_split(cls): """Inherited.""" return True @classmethod def split(cls, desired_size, start_index, opened_file, cache): """Inherited.""" if 'infolist' in cache: infolist = cache['infolist'] else: infolist = [] for i in opened_file: infolist.append(len(i)) cache['infolist'] = infolist index = start_index while desired_size > 0 and index < len(infolist): desired_size -= infolist[index] index += 1 return desired_size, index FORMATS['split_lines'] = _LinesSplitFormat 

Then the new file format can be called using the mapper_parameters parameters from lines to split_line .

 class LoadEntitiesPipeline(webapp2.RequestHandler): id = control.start_map(map_name, handler_spec="backend.line_processor", reader_spec="mapreduce.input_readers.FileInputReader", queue_name=get_queue_name("q-1"), shard_count=shard_count, mapper_parameters={ 'shard_count': shard_count, 'batch_size': 50, 'processing_rate': 1000000, 'files': [gsfile], 'format': 'split_lines'}) 
+5
source share

It seems to me that FileInputReader should be able to fine based on fast reading: https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/input_readers.py

It looks like "format": "lines" should be shared using: self.get_current_file (). readline ()

Does it seem to correctly interpret strings when it runs sequentially? Perhaps line breaks are the wrong encoding or something like that.

0
source share

From experience, FileInputReader will do a maximum of one shard per file. Separation of your large files. I use split_file at https://github.com/johnwlockwood/karl_data to trim files before uploading them to the cloud. If large files are already there, you can use the Compute Engine instance to pull them out and make shards, because the transfer speed will be the fastest. FYI: karld is in the raw store so you can pip install karld

0
source share

All Articles