Multiprocessing - manufacturer / consumer design

I use the multiprocessing module to separate a very large task. It works for the most part, but I have to miss something obvious in my design, because it is very difficult for me to communicate effectively when all the data has been processed.

I have two separate tasks that are performed; one that transmits another. I think this is a producer / consumer problem. I use a common queue between all processes where manufacturers fill the queue and consumers read from the queue and perform processing. The problem is that there is a finite amount of data, so at some point everyone should know that all the data has been processed so that the system can gracefully shut down.

It would seem that it makes sense to use the map_async () function, but since manufacturers fill the queue, I do not know all the elements in front, so I need to go into the while loop and use apply_async () and try to detect when everything is done with some kind of timeout .. ugly.

I feel like I'm missing something obvious. How can this be better designed?

PRODCUER

class ProducerProcess(multiprocessing.Process): def __init__(self, item, consumer_queue): self.item = item self.consumer_queue = consumer_queue multiprocessing.Process.__init__(self) def run(self): for record in get_records_for_item(self.item): # this takes time self.consumer_queue.put(record) def start_producer_processes(producer_queue, consumer_queue, max_running): running = [] while not producer_queue.empty(): running = [r for r in running if r.is_alive()] if len(running) < max_running: producer_item = producer_queue.get() p = ProducerProcess(producer_item, consumer_queue) p.start() running.append(p) time.sleep(1) 

CONSUMER

 def process_consumer_chunk(queue, chunksize=10000): for i in xrange(0, chunksize): try: # don't wait too long for an item # if new records don't arrive in 10 seconds, process what you have # and let the next process pick up more items. record = queue.get(True, 10) except Queue.Empty: break do_stuff_with_record(record) 

MAIN

 if __name__ == "__main__": manager = multiprocessing.Manager() consumer_queue = manager.Queue(1024*1024) producer_queue = manager.Queue() producer_items = xrange(0,10) for item in producer_items: producer_queue.put(item) p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8)) p.start() consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1) 

That's where it gets crappy. I cannot use the card because the list to be used is being filled at the same time. So I have to go into the while loop and try to determine the timeout. User_queue may become empty while manufacturers are still trying to fill it, so I cannot just find an empty queue to exit.

  timed_out = False timeout= 1800 while 1: try: result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,)) if timed_out: timed_out = False except Queue.Empty: if timed_out: break timed_out = True time.sleep(timeout) time.sleep(1) consumer_queue.join() consumer_pool.close() consumer_pool.join() 

I thought that maybe I can get () the records in the main thread and pass them to the consumer, and not pass the queue, but I think that in the end I get the same problem. I still need to run the while loop and use apply_async () Thank you in advance for any advice!

+6
source share
2 answers

You can use manager.Event to signal a shutdown. This event can be shared between all your processes, and then when you signal this from the main process, other employees can then gracefully shut down.

 while not event.is_set(): ...rest of code... 

So, your consumers will wait until the event is installed, and process the cleanup after it is installed.

To determine when to set this flag, you can do a join in the producer threads, and when all this is completed, you can join the consumer threads.

+2
source

I would highly recommend SimPy instead of multiprocessor / streaming to perform discrete simulation.

0
source

All Articles