How to run cleanup code in a Python multiprocessing pool?

I have Python code (on Windows) that uses a multiprocessing module to start a workflow pool. Each worker process should do some cleanup at the end of the map_async method.

Does anyone know how to do this?

+4
source share
2 answers

Are you sure you want to run the cleanup function once for each workflow, and not once for each task created by calling map_async ?

multiprocess.pool.Pool creates a pool, for example, 8 worker processes. map_async can represent 40 tasks that will be distributed among 8 employees. I can imagine why you might want to run the cleanup code at the end of each task, but I find it hard to understand why you want to run the cleanup code just before each of the 8 workflows is complete.

However, if this is what you want to do, you can do it with monkeypatching multiprocessing.pool.worker :

 import multiprocessing as mp import multiprocessing.pool as mpool from multiprocessing.util import debug def cleanup(): print('{n} CLEANUP'.format(n=mp.current_process().name)) # This code comes from /usr/lib/python2.6/multiprocessing/pool.py, # except for the single line at the end which calls cleanup(). def myworker(inqueue, outqueue, initializer=None, initargs=()): put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) while 1: try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) put((job, i, result)) cleanup() # Here we monkeypatch mpool.worker mpool.worker=myworker def foo(i): return i*i def main(): pool = mp.Pool(8) results = pool.map_async(foo, range(40)).get() print(results) if __name__=='__main__': main() 

gives:

 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521] PoolWorker-8 CLEANUP PoolWorker-3 CLEANUP PoolWorker-7 CLEANUP PoolWorker-1 CLEANUP PoolWorker-6 CLEANUP PoolWorker-2 CLEANUP PoolWorker-4 CLEANUP PoolWorker-5 CLEANUP 
+4
source

Your only real option is to start the cleanup at the end of the map_async function before.

If this cleanup is honestly designed to kill a process, you cannot use the concept of a pool. They are orthogonal. The pool does not determine the lifetime of a process unless you use the maxtasksperchild , which is new in Python 2.7. Even then you do not get the opportunity to run code when the process dies. However, maxtasksperchild may suit you because all the resources that the process opens will definitely disappear when the process is complete.

If you have a bunch of functions that you need to perform for cleaning, you can save duplication of effort by developing a decorator. Here is an example of what I mean:

 import functools import multiprocessing def cleanup(f): """Decorator for shared cleanup mechanism""" @functools.wraps(f) def wrapped(arg): result = f(arg) print("Cleaning up after f({0})".format(arg)) return result return wrapped @cleanup def task1(arg): print("Hello from task1({0})".format(arg)) return arg * 2 @cleanup def task2(arg): print("Bonjour from task2({0})".format(arg)) return arg ** 2 def main(): p = multiprocessing.Pool(processes=3) print(p.map(task1, [1, 2, 3])) print(p.map(task2, [1, 2, 3])) if __name__ == "__main__": main() 

When you do this (the stdout ban mixed because I don't block it here for brevity), the order you get should indicate that your cleanup task runs at the end of each task:

 Hello from task1(1) Cleaning up after f(1) Hello from task1(2) Cleaning up after f(2) Hello from task1(3) Cleaning up after f(3) [2, 4, 6] Bonjour from task2(1) Cleaning up after f(1) Bonjour from task2(2) Cleaning up after f(2) Bonjour from task2(3) Cleaning up after f(3) [1, 4, 9] 
+3
source

All Articles