Are you sure you want to run the cleanup function once for each workflow, and not once for each task created by calling map_async ?
multiprocess.pool.Pool creates a pool, for example, 8 worker processes. map_async can represent 40 tasks that will be distributed among 8 employees. I can imagine why you might want to run the cleanup code at the end of each task, but I find it hard to understand why you want to run the cleanup code just before each of the 8 workflows is complete.
However, if this is what you want to do, you can do it with monkeypatching multiprocessing.pool.worker :
import multiprocessing as mp import multiprocessing.pool as mpool from multiprocessing.util import debug def cleanup(): print('{n} CLEANUP'.format(n=mp.current_process().name)) # This code comes from /usr/lib/python2.6/multiprocessing/pool.py, # except for the single line at the end which calls cleanup(). def myworker(inqueue, outqueue, initializer=None, initargs=()): put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) while 1: try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) put((job, i, result)) cleanup() # Here we monkeypatch mpool.worker mpool.worker=myworker def foo(i): return i*i def main(): pool = mp.Pool(8) results = pool.map_async(foo, range(40)).get() print(results) if __name__=='__main__': main()
gives:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521] PoolWorker-8 CLEANUP PoolWorker-3 CLEANUP PoolWorker-7 CLEANUP PoolWorker-1 CLEANUP PoolWorker-6 CLEANUP PoolWorker-2 CLEANUP PoolWorker-4 CLEANUP PoolWorker-5 CLEANUP