How to pass large numpy arrays between python subprocesses without saving to disk?

Is there a good way to transfer a large chunk of data between two python subprocesses without using a disk? Here is a cartoony example of what I hope to accomplish:

import sys, subprocess, numpy cmdString = """ import sys, numpy done = False while not done: cmd = raw_input() if cmd == 'done': done = True elif cmd == 'data': ##Fake data. In real life, get data from hardware. data = numpy.zeros(1000000, dtype=numpy.uint8) data.dump('data.pkl') sys.stdout.write('data.pkl' + '\\n') sys.stdout.flush()""" proc = subprocess.Popen( #python vs. pythonw on Windows? [sys.executable, '-c %s'%cmdString], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in range(3): proc.stdin.write('data\n') print proc.stdout.readline().rstrip() a = numpy.load('data.pkl') print a.shape proc.stdin.write('done\n') 

This creates a subprocess that generates a numpy array and saves the array to disk. Then the parent process loads the array from disk. He works!

The problem is that our equipment can generate data 10 times faster than a disk can read / write. Is there a way to transfer data from one python process to another exclusively in memory, perhaps even without making a copy of the data? Can I do something like pass by reference?

My first attempt to transfer data exclusively in memory is pretty lousy:

 import sys, subprocess, numpy cmdString = """ import sys, numpy done = False while not done: cmd = raw_input() if cmd == 'done': done = True elif cmd == 'data': ##Fake data. In real life, get data from hardware. data = numpy.zeros(1000000, dtype=numpy.uint8) ##Note that this is NFG if there a '10' in the array: sys.stdout.write(data.tostring() + '\\n') sys.stdout.flush()""" proc = subprocess.Popen( #python vs. pythonw on Windows? [sys.executable, '-c %s'%cmdString], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in range(3): proc.stdin.write('data\n') a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8) print a.shape proc.stdin.write('done\n') 

It is very slow (much slower than saving to disk) and very, very fragile. There must be a better way!

I am not married to the subprocess module if the data processing process does not block the parent application. I briefly tried "multiprocessing", but have not had time yet.

Background: we have a piece of equipment that generates up to ~ 2 GB / s of data in a series of ctypes buffers. The python code for handling these buffers has its own hands, completely related to the flow of information. I want to coordinate this flow of information with several other hardware running simultaneously in the "main" program, without subprocesses blocking each other. My current approach is to slightly collapse the data in the subprocess before saving to disk, but it would be nice to pass the full monty to the "master" process.

+23
python pass-by-reference numpy subprocess ctypes
Feb 17 '11 at 19:47
source share
6 answers

While googling for more information on Joe Kington's code posted, I found numpy-sharedmem . Judging by this numpy / multiprocessing tutorial , it seems to have the same intellectual heritage (maybe mostly the same authors?), I'm not sure).

Using the sharedmem module, you can create a numpy array with shared memory (awesome!) And use it with multiprocessing as follows:

 import sharedmem as shm import numpy as np import multiprocessing as mp def worker(q,arr): done = False while not done: cmd = q.get() if cmd == 'done': done = True elif cmd == 'data': ##Fake data. In real life, get data from hardware. rnd=np.random.randint(100) print('rnd={0}'.format(rnd)) arr[:]=rnd q.task_done() if __name__=='__main__': N=10 arr=shm.zeros(N,dtype=np.uint8) q=mp.JoinableQueue() proc = mp.Process(target=worker, args=[q,arr]) proc.daemon=True proc.start() for i in range(3): q.put('data') # Wait for the computation to finish q.join() print arr.shape print(arr) q.put('done') proc.join() 

Current yield

 rnd=53 (10,) [53 53 53 53 53 53 53 53 53 53] rnd=15 (10,) [15 15 15 15 15 15 15 15 15 15] rnd=87 (10,) [87 87 87 87 87 87 87 87 87 87] 
+23
Feb 18 '11 at 1:26
source share

Basically, you just want to split the memory block between the processes and view it as a numpy array, right?

In this case, take a look at this (Added to the numpy discussion of Nadav Khoresh some time ago, not my work). There are several similar implementations (some more flexible), but they all essentially use this principle.

 # "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing" # Modified and corrected by Nadav Horesh, Mar 2010 # No rights reserved import numpy as N import ctypes import multiprocessing as MP _ctypes_to_numpy = { ctypes.c_char : N.dtype(N.uint8), ctypes.c_wchar : N.dtype(N.int16), ctypes.c_byte : N.dtype(N.int8), ctypes.c_ubyte : N.dtype(N.uint8), ctypes.c_short : N.dtype(N.int16), ctypes.c_ushort : N.dtype(N.uint16), ctypes.c_int : N.dtype(N.int32), ctypes.c_uint : N.dtype(N.uint32), ctypes.c_long : N.dtype(N.int64), ctypes.c_ulong : N.dtype(N.uint64), ctypes.c_float : N.dtype(N.float32), ctypes.c_double : N.dtype(N.float64)} _numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys())) def shmem_as_ndarray(raw_array, shape=None ): address = raw_array._obj._wrapper.get_address() size = len(raw_array) if (shape is None) or (N.asarray(shape).prod() != size): shape = (size,) elif type(shape) is int: shape = (shape,) else: shape = tuple(shape) dtype = _ctypes_to_numpy[raw_array._obj._type_] class Dummy(object): pass d = Dummy() d.__array_interface__ = { 'data' : (address, False), 'typestr' : dtype.str, 'descr' : dtype.descr, 'shape' : shape, 'strides' : None, 'version' : 3} return N.asarray(d) def empty_shared_array(shape, dtype, lock=True): ''' Generate an empty MP shared array given ndarray parameters ''' if type(shape) is not int: shape = N.asarray(shape).prod() try: c_type = _numpy_to_ctypes[dtype] except KeyError: c_type = _numpy_to_ctypes[N.dtype(dtype)] return MP.Array(c_type, shape, lock=lock) def emptylike_shared_array(ndarray, lock=True): 'Generate a empty shared array with size and dtype of a given array' return empty_shared_array(ndarray.size, ndarray.dtype, lock) 
+8
Feb 17 '11 at 20:18
source share

From the other answers, it seems that numpy-sharedmem is the way to go.

However, if you need a clean python solution or installing extensions, cython or the like is a big problem, you can use the following code, which is a simplified version of Nadav code:

 import numpy, ctypes, multiprocessing _ctypes_to_numpy = { ctypes.c_char : numpy.dtype(numpy.uint8), ctypes.c_wchar : numpy.dtype(numpy.int16), ctypes.c_byte : numpy.dtype(numpy.int8), ctypes.c_ubyte : numpy.dtype(numpy.uint8), ctypes.c_short : numpy.dtype(numpy.int16), ctypes.c_ushort : numpy.dtype(numpy.uint16), ctypes.c_int : numpy.dtype(numpy.int32), ctypes.c_uint : numpy.dtype(numpy.uint32), ctypes.c_long : numpy.dtype(numpy.int64), ctypes.c_ulong : numpy.dtype(numpy.uint64), ctypes.c_float : numpy.dtype(numpy.float32), ctypes.c_double : numpy.dtype(numpy.float64)} _numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys())) def shm_as_ndarray(mp_array, shape = None): '''Given a multiprocessing.Array, returns an ndarray pointing to the same data.''' # support SynchronizedArray: if not hasattr(mp_array, '_type_'): mp_array = mp_array.get_obj() dtype = _ctypes_to_numpy[mp_array._type_] result = numpy.frombuffer(mp_array, dtype) if shape is not None: result = result.reshape(shape) return numpy.asarray(result) def ndarray_to_shm(array, lock = False): '''Generate an 1D multiprocessing.Array containing the data from the passed ndarray. The data will be *copied* into shared memory.''' array1d = array.ravel(order = 'A') try: c_type = _numpy_to_ctypes[array1d.dtype] except KeyError: c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)] result = multiprocessing.Array(c_type, array1d.size, lock = lock) shm_as_ndarray(result)[:] = array1d return result 

You would use it as follows:

  • Use sa = ndarray_to_shm(a) to convert ndarray a to a common multiprocessing.Array .
  • Use multiprocessing.Process(target = somefunc, args = (sa, ) (and start , possibly join ) to call somefunc in a separate process , passing in a shared array.
  • In somefunc use a = shm_as_ndarray(sa) to get an ndarray pointing to shared data. (In fact, you can do the same in the original process right after creating sa to have two ndarrays referring to the same data.)

AFAICS, you do not need to set the lock to True, since shm_as_ndarray will not use the lock anyway. If you need a lock, you must set the lock to True and call get / release on sa .

In addition, if your array is not one-dimensional, you can transfer the form along with sa (for example, use args = (sa, a.shape) ).

This solution has the advantage that it does not need additional packages or expansion modules, with the exception of multiprocessing (which is in the standard library).

+5
Mar 13 '13 at 16:26
source share

Use streams. But I think you will have problems with the GIL.

Instead: choose poison .

I know from the MPI implementation that I work with that they use shared memory for on-node communications. In this case, you will have to encode your own synchronization.

2 GB / s, it seems that you will have problems with most of the β€œsimple” methods, depending on your real-time limitations and the available main memory.

+3
Feb 17 '11 at 19:56
source share

Use streams. You probably won't have a problem with the GIL.

GIL only affects Python code, not C / Fortran / Cython libraries. Most numpy operations and a good piece of the C-backed Scientific Python stack produce GIL and can work fine on multiple cores. This blogpost discusses GIL and scientific Python in more detail.

Edit

Simple ways to use threads include the threading module and multiprocessing.pool.ThreadPool .

+1
May 03 '15 at 17:15
source share

You might consider using a RAM disk to temporarily store files that will be shared between processes . The RAM drive is a place where part of the RAM is processed as a logical hard disk, to which files can be written / read the same way as on a regular disk, but at a read / write speed of RAM.

This article describes the use of ImDisk software (for MS Win) to create such a disk and obtain a read / write speed of files of 6-10 gigabytes per second: https://www.tekrevue.com/tip/create-10-gbs-ram -disk-windows /

Ubuntu example: https://askubuntu.com/questions/152868/how-do-i-make-a-ram-disk#152871

Another important advantage is that files with arbitrary formats can be transferred using this method: for example. Picke, JSON, XML, CSV, HDF5, etc.

Keep in mind that everything that is stored on the RAM disk is erased upon reboot.

+1
Jan 02 '18 at 17:37
source share



All Articles