Python, run concurrent.futures.ProcessPoolExecutor with initialization?

I plan to use concurrent.futures.ProcessPoolExecutor to parallelize the execution of functions. According to the documentation , its executor object can only accept a simple function in map . My actual situation involves initializing (loading data) before executing the "to-be-parallelized" function. How to organize this?

The to-be-parallelized function is iterated many times. I do not want it to be reinitialized every time.

In other words, there is an init function that produces some output for this tbp function. Each child should have its own copy of this conclusion, because the function depended on it.

+4
source share
1 answer

It looks like you are looking for the equivalent of the initializer / initargs , which are multiprocessing.Pool . This behavior does not currently exist for concurrent.futures.ProcessPoolExecutor , although there is a patch waiting to be viewed that adds this behavior.

That way you can use multiprocessing.Pool (which might be good for your usecase), wait for this patch to be merged and released (you could wait a while :)), or roll your own solution. It turns out that it’s not too difficult to write a wrapper function for a map that accepts an initializer , but only calls it one at a time per process:

 from concurrent.futures import ProcessPoolExecutor from functools import partial inited = False initresult = None def initwrapper(initfunc, initargs, f, x): # This will be called in the child. inited # Will be False the first time its called, but then # remain True every other time its called in a given # worker process. global inited, initresult if not inited: inited = True initresult = initfunc(*initargs) return f(x) def do_init(a,b): print('ran init {} {}'.format(a,b)) return os.getpid() # Just to demonstrate it will be unique per process def f(x): print("Hey there {}".format(x)) print('initresult is {}'.format(initresult)) return x+1 def initmap(executor, initializer, initargs, f, it): return executor.map(partial(initwrapper, initializer, initargs, f), it) if __name__ == "__main__": with ProcessPoolExecutor(4) as executor: out = initmap(executor, do_init, (5,6), f, range(10)) print(list(out)) 

Output:

 ran init 5 6 Hey there 0 initresult is 4568 ran init 5 6 Hey there 1 initresult is 4569 ran init 5 6 Hey there 2 initresult is 4570 Hey there 3 initresult is 4569 Hey there 4 initresult is 4568 ran init 5 6 Hey there 5 initresult is 4571 Hey there 6 initresult is 4570 Hey there 7 initresult is 4569 Hey there 8 initresult is 4568 Hey there 9 initresult is 4570 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
+3
source

All Articles