I use the multiprocessing module to separate a very large task. It works for the most part, but I have to miss something obvious in my design, because it is very difficult for me to communicate effectively when all the data has been processed.
I have two separate tasks that are performed; one that transmits another. I think this is a producer / consumer problem. I use a common queue between all processes where manufacturers fill the queue and consumers read from the queue and perform processing. The problem is that there is a finite amount of data, so at some point everyone should know that all the data has been processed so that the system can gracefully shut down.
It would seem that it makes sense to use the map_async () function, but since manufacturers fill the queue, I do not know all the elements in front, so I need to go into the while loop and use apply_async () and try to detect when everything is done with some kind of timeout .. ugly.
I feel like I'm missing something obvious. How can this be better designed?
PRODCUER
class ProducerProcess(multiprocessing.Process): def __init__(self, item, consumer_queue): self.item = item self.consumer_queue = consumer_queue multiprocessing.Process.__init__(self) def run(self): for record in get_records_for_item(self.item):
CONSUMER
def process_consumer_chunk(queue, chunksize=10000): for i in xrange(0, chunksize): try:
MAIN
if __name__ == "__main__": manager = multiprocessing.Manager() consumer_queue = manager.Queue(1024*1024) producer_queue = manager.Queue() producer_items = xrange(0,10) for item in producer_items: producer_queue.put(item) p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8)) p.start() consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
That's where it gets crappy. I cannot use the card because the list to be used is being filled at the same time. So I have to go into the while loop and try to determine the timeout. User_queue may become empty while manufacturers are still trying to fill it, so I cannot just find an empty queue to exit.
timed_out = False timeout= 1800 while 1: try: result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,)) if timed_out: timed_out = False except Queue.Empty: if timed_out: break timed_out = True time.sleep(timeout) time.sleep(1) consumer_queue.join() consumer_pool.close() consumer_pool.join()
I thought that maybe I can get () the records in the main thread and pass them to the consumer, and not pass the queue, but I think that in the end I get the same problem. I still need to run the while loop and use apply_async () Thank you in advance for any advice!