Use numpy array in shared memory for multiprocessing

I would like to use a numpy array in shared memory for use with a multiprocessing module. The difficulty is using it as a numpy array, and not just as a ctypes array.

from multiprocessing import Process, Array import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child processes p = Process(target=f, args=(arr,)) p.start() p.join() # Printing out the changed values print "Now, the first two elements of arr = %s"%arr[:2] 

This produces output, for example:

 Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976] Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976] 

An array is accessed using the ctypes type, for example. arr[i] makes sense. However, it is not a numpy array, and I cannot perform operations such as -1*arr or arr.sum() . I assume that the solution would be to convert the ctypes array to a numpy array. However (apart from the fact that I cannot do this work), I do not believe that this will be general.

There seems to be a standard solution to what should be a common problem.

+75
python numpy multiprocessing shared
Oct 25 '11 at 19:34
source share
5 answers

To add to @unutbu (no longer available) and @Henry Gomersall answers. You can use shared_arr.get_lock() to synchronize access if necessary:

 shared_arr = mp.Array(ctypes.c_double, N) # ... def f(i): # could be anything numpy accepts as an index such another numpy array with shared_arr.get_lock(): # synchronize access arr = np.frombuffer(shared_arr.get_obj()) # no data copying arr[i] = -arr[i] 

Example

 import ctypes import logging import multiprocessing as mp from contextlib import closing import numpy as np info = mp.get_logger().info def main(): logger = mp.log_to_stderr() logger.setLevel(logging.INFO) # create shared array N, M = 100, 11 shared_arr = mp.Array(ctypes.c_double, N) arr = tonumpyarray(shared_arr) # fill with random values arr[:] = np.random.uniform(size=N) arr_orig = arr.copy() # write to arr from different processes with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p: # many processes access the same slice stop_f = N // 10 p.map_async(f, [slice(stop_f)]*M) # many processes access different slices of the same array assert M % 2 # odd step = N // 10 p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)]) p.join() assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig) def init(shared_arr_): global shared_arr shared_arr = shared_arr_ # must be inherited, not passed as an argument def tonumpyarray(mp_arr): return np.frombuffer(mp_arr.get_obj()) def f(i): """synchronized.""" with shared_arr.get_lock(): # synchronize access g(i) def g(i): """no synchronization.""" info("start %s" % (i,)) arr = tonumpyarray(shared_arr) arr[i] = -1 * arr[i] info("end %s" % (i,)) if __name__ == '__main__': mp.freeze_support() main() 

If you do not need synchronized access or you create your own locks, then mp.Array() not required. You can use mp.sharedctypes.RawArray in this case.

+66
Oct 26 '11 at 20:36
source share

An Array object has an associated get_obj() method that returns a ctypes array that represents the buffer interface. I think the following should work ...

 from multiprocessing import Process, Array import scipy import numpy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) a = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(a[:2]) # Create, start, and finish the child process p = Process(target=f, args=(a,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%a[:2] b = numpy.frombuffer(a.get_obj()) b[0] = 10.0 print a[0] 

When launched, it returns the first element a , now 10.0, showing a and b just two views in the same memory.

To make sure that it is still multiprocessor, I believe that you will have to use the acquire and release methods that exist in the Array object, and its built-in lock to make sure that everything is safe (although I am not an expert in the multiprocessor module )

+15
Oct 26 '11 at 19:26
source share

Although the answers already provided are good, it’s much easier to solve this problem if two conditions are met:

  • You are in a POSIX-compatible operating system (for example, Linux, Mac OSX); and
  • Your child processes require read-only access to the shared array.

In this case, you do not need to bother with explicit redistribution of variables, since child processes will be created using a fork. A forked child automatically shares the parent memory space. In the context of Python multiprocessing, this means that it shares all the module level variables; note that this does not apply to arguments that you explicitly pass to child processes or to functions that you call on multiprocessing.Pool or so.

A simple example:

 import multiprocessing import numpy as np # will hold the (implicitly mem-shared) data data_array = None # child worker function def job_handler(num): # built-in id() returns unique memory ID of a variable return id(data_array), np.sum(data_array) def launch_jobs(data, num_jobs=5, num_worker=4): global data_array data_array = data pool = multiprocessing.Pool(num_worker) return pool.map(job_handler, range(num_jobs)) # create some random data and execute the child jobs mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10))) # this will print 'True' on POSIX OS, since the data was shared print(np.all(np.asarray(mem_ids) == id(data_array))) 
+12
Jun 10 '16 at 11:20
source share

I wrote a small python module that uses POSIX shared memory to share numpy arrays between python interpreters. Maybe it will be convenient for you.

https://pypi.python.org/pypi/SharedArray

Here's how it works:

 import numpy as np import SharedArray as sa # Create an array in shared memory a = sa.create("test1", 10) # Attach it as a different array. This can be done from another # python interpreter as long as it runs on the same computer. b = sa.attach("test1") # See how they are actually sharing the same memory block a[0] = 42 print(b[0]) # Destroying a does not affect b. del a print(b[0]) # See how "test1" is still present in shared memory even though we # destroyed the array a. sa.list() # Now destroy the array "test1" from memory. sa.delete("test1") # The array b is not affected, but once you destroy it then the # data are lost. print(b[0]) 
+9
22 Oct '15 at 10:22
source share

You can use the sharedmem module: https://bitbucket.org/cleemesser/numpy-sharedmem

Here is your source code, this time using shared memory, which behaves like a NumPy array (note the additional last statement that calls the NumPy sum() function):

 from multiprocessing import Process import sharedmem import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = sharedmem.empty(N) arr[:] = unshared_arr.copy() print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child process p = Process(target=f, args=(arr,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%arr[:2] # Perform some NumPy operation print arr.sum() 
+8
May 25 '13 at 19:27
source share



All Articles