Selecting items progressively as soon as the queue appears

I am looking for a reliable implementation to allow me to gradually work through a list of elements using Queue .

The idea is that I want to use a certain number of workers who will go through a list of 20+ tasks with an intensive database and return a result. I want Python to start with the first five elements, and as soon as this is done with one task, the next task in the queue begins.

This is how I do it now without Threading .

for key, v in self.sources.iteritems(): # Do Stuff 

I would like to have a similar approach, but perhaps without having to split the list into subgroups of five. So he will automatically pick up the next item in the list. The goal is to make sure that if one database slows down the process, it will not adversely affect the entire application.

+7
source share
3 answers

You can implement this yourself, but Python 3 already comes with an Executor level flow control solution that you can use in Python 2.x by installing a supported version .

Then your code might look like

 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_key = {} for key, value in sources.items(): future_to_idday[executor.submit(do_stuff, value)] = key for future in concurrent.futures.as_completed(future_to_key): key = future_to_key[future] result = future.result() # process result 
+5
source

If you are using python3, I recommend the parallel futures module. If you are not using python3 and are not thread-bound (compared to processes), then you can try multiprocessing.Pool (although it has some caveats and I had problems closing pools that don't close properly in my applications) . If you have to use threads in python2, you can write the code yourself - run 5 threads with user functions and just click on the calls (function + args) in an iterative queue so that consumers can find and process them.

+3
source

You can do this using only stdlib:

 #!/usr/bin/env python from multiprocessing.dummy import Pool # use threads def db_task(key_value): try: key, value = key_value # compute result.. return result, None except Exception as e: return None, e def main(): pool = Pool(5) for result, error in pool.imap_unordered(db_task, sources.items()): if error is None: print(result) if __name__=="__main__": main() 
+1
source

All Articles