Python Multiprocessor - Cannot Join Current Thread

I work with segmentation of large ctype arrays and process them in parallel. I get the error below and believe in it, because one segment of the array completes processing before the other. I tried using process.join () to wait for the first set of processes to start, but this does not work. Ideas?

Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored 

Using:

  .... with closing(multiprocessing.Pool(initializer=init(array))) as p: del array #Since the array is now stored in a shared array destroy the array ref for memory reasons step = y // cores if step != 0: jobs =[] for i in range (0, y, step): process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options) jobs.append(process) process.start() for j in jobs: j.join() del jobs del process 

Update:

  #Create an ctypes array array = ArrayConvert.SharedMemArray(array) #Create a global of options init_options(options) #options is a dict with closing(multiprocessing.Pool(initializer=init(array))) as p: del array #Since the array is not stored in a shared array destroy the array ref for memory reasons step = y // cores if step != 0: for i in range (0, y, step): #Package all the options into a global dictionary p.map_async(stretch,[slice(i, i+step)]) #p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options) p.join() def init_options(options_): global kwoptions kwoptions = options_ 

The function that I pass to map_async is stored in another module, so I am trying to get the global kwoptions passed to this function. It does not seem that there has been a global movement between the global (unpythonic) modules. Is this a way to pass kwargs through map_async.

Should I recycle multiprocessing using something else (apply or process)?

+4
source share
2 answers

initializer argument to Pool() takes a function; replace initializer=init(array) with initializer=init, initargs=(array,)

To pass the keyword arguments to the f() function used with the pool.*map* family, you can create a wrapper mp_f() :

 #!/usr/bin/env python import logging import multiprocessing as mp from contextlib import closing def init(shared_array_): # globals that should be available in worker processes should be # initialized here global shared_array shared_array = shared_array_ def f(interval, a=None, b=None): mp.get_logger().info("interval=%r, a=%r, b=%r" % (interval, a, b)) shared_array[interval] = [a + interval.start]*b # fake computations def mp_f(arg_kwargs): try: arg, kwargs = arg_kwargs return f(arg, **kwargs) # pass keyword args to f() except Exception: mp.get_logger().error("f%r failed" % (arg_kwargs,)) def main(): mp.log_to_stderr().setLevel(logging.INFO) N = 10**6 array = mp.RawArray('i', N) # create shared array # create workers pool; use all available CPU cores with closing(mp.Pool(initializer=init, initargs=(array,))) as p: options = dict(a=5, b=N//4) # dummy options step = options['b'] args = ((slice(i, i+step), options) for i in range(0, N, step)) for _ in p.imap_unordered(mp_f, args): # submit jobs pass p.join() mp.get_logger().info(array[::step]) if __name__=="__main__": mp.freeze_support() # for py2exe and the-like on Windows main() 
+1
source

So, I got this working by reworking the code and deleting the pool (according to JF Sebastian's comment).

In pseudo code:

 initialize the shared array determine step size create an empty list of jobs create the process, pass it the kwargs, and append it to the job list start the jobs join the jobs 

Here is the code if this helps any googler:

 #Initialize the ctypes array init(array) #Remove the reference to the array (to preserve memory on multiple iterations. del array step = y // cores jobs = [] if step != 0: for i in range(0,y,step): p = multiprocessing.Process(target=stretch,args= (shared_arr,slice(i, i+step)),kwargs=options) jobs.append(p) for job in jobs: job.start() for job in jobs: job.join() 
+2
source

All Articles