Using multiprocessing / threading to interrupt numpy array in pieces

I have a specific function that displays an array of MxN. The array is very large, so I want to use this function to create small arrays (M1xN, M2xN, M3xN --- MixN. M1 + M2 + M3 + --- + Mi = M) simultaneously using multiprocessing / these arrays form an mxn array . Since Mr. Boardrider rightfully proposed a viable example, what I intend to do will be widely conveyed in the following example.

import numpy as n def mult(y,x): r = n.empty([len(y),len(x)]) for i in range(len(r)): r[i] = y[i]*x return r x = n.random.rand(10000) y = n.arange(0,100000,1) test = mult(y=y,x=x) 

As the lengths of x and y increase, the system will take longer. Regarding this example, I want to run this code in such a way that if I have 4 cores, I can give a quarter of the task to everyone, i.e. Give a task to calculate the elements r[0] to r[24999] to the 1st core, r[25000] to r[49999] to the second core, from r[50000] to r[74999] to the third core and from r[75000] to r[99999] to the 4th core. As a result, combine the results, add them to get one array r[0] to r[99999] .

I hope this example clarifies the situation. If my problem is still not clear, let me know.

+8
python arrays multithreading multiprocessing
source share
1 answer

The first thing to say: if it concerns several cores on one processor, numpy already able to parallelize the operation better than we could do manually (see the discussion on multiplying large arrays in python )

In this case, the key will simply ensure that all multiplication is performed in the bulk array operation, and not in Python for -loop:

 test2 = x[n.newaxis, :] * y[:, n.newaxis] n.abs( test - test2 ).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations 

[If you really wanted to distribute this across several separate processors, this is another matter, but the question seems to be offered by one (multi-core) processor.]


OK, given the above: suppose you want to parallelize an operation that is more complex than just mult() . Suppose you tried very hard to optimize your operation in bulk array operations that numpy can parallelize, but your operation is simply not subject to this. In this case, you can use the shared memory multiprocessing.Array created with lock=False and multiprocessing.Pool to assign processes to address non-overlapping fragments separated by y size (and also above x if you want). The following is an example. Note that this approach does not explicitly do what you specify (combine the results together and add them to a single array). Rather, it does something more efficient: several processes simultaneously collect their parts of the response in non-overlapping parts of shared memory. After that, sorting / adding is not required: we just read the result.

 import os, numpy, multiprocessing, itertools SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/ def operate( slices ): # grok the inputs yslice, xslice = slices y, x, r = get_shared_arrays('y', 'x', 'r') # create views of the appropriate chunks/slices of the arrays: y = y[yslice] x = x[xslice] r = r[yslice, xslice] # do the actual business for i in range(len(r)): r[i] = y[i] * x # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself. # But let assume this is a placeholder for something more complicated. return 'Process %d operated on y[%s] and x[%s] (%dx %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size) def check(y, x, r): r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis] # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary) print( 'max. abs. diff. = %g' % numpy.abs(r - r2).max() ) return y, x, r def slicestr(s): return ':'.join( '' if x is None else str(x) for x in [s.start, s.stop, s.step] ) def m2n(buf, shape, typecode, ismatrix=False): """ Return a numpy.array VIEW of a multiprocessing.Array given a handle to the array, the shape, the data typecode, and a boolean flag indicating whether the result should be cast as a matrix. """ a = numpy.frombuffer(buf, dtype=typecode).reshape(shape) if ismatrix: a = numpy.asmatrix(a) return a def n2m(a): """ Return a multiprocessing.Array COPY of a numpy.array, together with shape, typecode and matrix flag. """ if not isinstance(a, numpy.ndarray): a = numpy.array(a) return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix) def new_shared_array(shape, typecode='d', ismatrix=False): """ Allocate a new shared array and return all the details required to reinterpret it as a numpy array or matrix (same order of output arguments as n2m) """ typecode = numpy.dtype(typecode).char return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix def get_shared_arrays(*names): return [m2n(*SHARED_VARS[name]) for name in names] def init(*pargs, **kwargs): SHARED_VARS.update(pargs, **kwargs) if __name__ == '__main__': ylen = 1000 xlen = 2000 init( y=n2m(range(ylen)) ) init( x=n2m(numpy.random.rand(xlen)) ) init( r=new_shared_array([ylen, xlen], float) ) print('Master process ID is %s' % os.getpid()) #print( operate([slice(None), slice(None)]) ); check(*get_shared_arrays('y', 'x', 'r')) # local test pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items()) yslices = [slice(0,333), slice(333,666), slice(666,None)] xslices = [slice(0,1000), slice(1000,None)] #xslices = [slice(None)] # uncomment this if you only want to divide things up in the y dimension reports = pool.map(operate, itertools.product(yslices, xslices)) print('\n'.join(reports)) y, x, r = check(*get_shared_arrays('y', 'x', 'r')) 
+6
source share

All Articles