Parallel algorithm for calculating a 1-dimensional array of functions on an 1d numpy array with the same length

Below is the fact that I have an awkwardly parallel loop I'm trying to use. There is a bit of rigidity to explain the problem, but despite all the verbosity, I think it should be a rather trivial problem for the multiprocessor module to be designed to solve easily.

I have a long-N array of k different functions and an abcissa length-N array. Thanks to the smart solution provided by @senderle described in Efficient Algorithm for Evaluating a 1-Dimensional Array of Functions in a 1d numpy Array of Equal Length , I have a fast numpy based algorithm that I can use to evaluate functions in abcissa to return an array of orbits of length -N:

def apply_indexed_fast(abcissa, func_indices, func_table): """ Returns the output of an array of functions evaluated at a set of input points if the indices of the table storing the required functions are known. Parameters ---------- func_table : array_like Length k array of function objects abcissa : array_like Length Npts array of points at which to evaluate the functions. func_indices : array_like Length Npts array providing the indices to use to choose which function operates on each abcissa element. Thus func_indices is an array of integers ranging between 0 and k-1. Returns ------- out : array_like Length Npts array giving the evaluation of the appropriate function on each abcissa element. """ func_argsort = func_indices.argsort() func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table)))) func_ranges.append(None) out = np.zeros_like(abcissa) for i in range(len(func_table)): f = func_table[i] start = func_ranges[i] end = func_ranges[i+1] ix = func_argsort[start:end] out[ix] = f(abcissa[ix]) return out 

What I'm trying to do now is to use multiprocessing to parallelize the for loop inside this function. Before describing my approach, for clarity, I will briefly describe how the @senderle algorithm works. If you can read the code above and understand it right away, just skip the next paragraph of text.

First we find an array of indices that sorts the input func_indices, which we use to define an array of integers length-k func_ranges. The integer func_ranges elements control a function that applies to the corresponding submatrix of the input abcissa, which works as follows. Let f be the ith function in the input func_table. Then the slice of the input abscissa, to which we must apply the function f, is the slice (func_ranges [i], func_ranges [i + 1]). Therefore, as soon as func_ranges is calculated, we can simply start a simple loop to enter func_table and sequentially apply each function object to the corresponding fragment, filling our output array. See the code below for a minimal example of this algorithm in action.

 def trivial_functional(i): def f(x): return i*x return f k = 250 func_table = np.array([trivial_functional(j) for j in range(k)]) Npts = 1e6 abcissa = np.random.random(Npts) func_indices = np.random.random_integers(0,len(func_table)-1,Npts) result = apply_indexed_fast(abcissa, func_indices, func_table) 

So now my goal is to use multiprocessing to parallelize this calculation. I thought it would be easy, using my usual trick, which is looping unevenly in loops. But my attempt below raises an exception that I don't understand.

 from multiprocessing import Pool, cpu_count def apply_indexed_parallelized(abcissa, func_indices, func_table): func_argsort = func_indices.argsort() func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table)))) func_ranges.append(None) out = np.zeros_like(abcissa) num_cores = cpu_count() pool = Pool(num_cores) def apply_funci(i): f = func_table[i] start = func_ranges[i] end = func_ranges[i+1] ix = func_argsort[start:end] out[ix] = f(abcissa[ix]) pool.map(apply_funci, range(len(func_table))) pool.close() return out result = apply_indexed_parallelized(abcissa, func_indices, func_table) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

I saw this elsewhere on SO: Multiprocessing: how to use Pool.map for a function defined in a class? . One by one, I tried every method suggested there; in all cases, I get the error "too many files" because the threads never closed, or the adapted algorithm just hangs. It looks like there should be a simple solution, as it is nothing more than slicing an awkward parallel loop.

+5
source share
1 answer

Warning / Disclaimer:

You might not want to apply multiprocessing to this problem. You will find that relatively simple operations with large arrays, problems will be related to memory using numpy . The bottleneck is moving data from RAM to processor caches. The processor is starving for data, so throwing more processors into the problem does not help much. In addition, your current approach will pickle and make a copy of the entire array for each element in your input sequence, which will add a lot of overhead.

There are many cases where numpy + multiprocessing very efficient, but you need to make sure that you are working with a processor related issue. Ideally, this is a problem with CPUs with relatively small inputs and outputs, in order to ease the overhead of etching input and output. For many problems for which numpy is most commonly used, this is not the case.


Two problems with your current approach

In answer to your question:

Your immediate mistake is related to the transfer of a function inaccessible from the global scope (i.e. a function defined inside a function).

However, you have one more problem. You process numpy arrays as if they were shared memory that could be modified by each process. Instead, when using multiprocessing original array will be pickled (effectively making a copy) and transferred to each process independently. The original array will never be modified.


Avoidance PicklingError

As a minimal example to reproduce your error, consider the following:

 import multiprocessing def apply_parallel(input_sequence): def func(x): pass pool = multiprocessing.Pool() pool.map(func, input_sequence) pool.close() foo = range(100) apply_parallel(foo) 

This will lead to:

 PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Of course, in this simple example, we could just move the function definition back to the __main__ namespace. However, in yours, you need it to refer to the transferred data. Take a look at an example that is a little closer to what you are doing:

 import numpy as np import multiprocessing def parallel_rolling_mean(data, window): data = np.pad(data, window, mode='edge') ind = np.arange(len(data)) + window def func(i): return data[i-window:i+window+1].mean() pool = multiprocessing.Pool() result = pool.map(func, ind) pool.close() return result foo = np.random.rand(20).cumsum() result = parallel_rolling_mean(foo, 10) 

There are several ways to handle this, but the general approach is something like:

 import numpy as np import multiprocessing class RollingMean(object): def __init__(self, data, window): self.data = np.pad(data, window, mode='edge') self.window = window def __call__(self, i): start = i - self.window stop = i + self.window + 1 return self.data[start:stop].mean() def parallel_rolling_mean(data, window): func = RollingMean(data, window) ind = np.arange(len(data)) + window pool = multiprocessing.Pool() result = pool.map(func, ind) pool.close() return result foo = np.random.rand(20).cumsum() result = parallel_rolling_mean(foo, 10) 

Excellent! He works!


However, if you scale the scale to large arrays, you will soon find that it will either work very slowly (which you can speed up by increasing the chunksize in the pool.map call), or you will quickly start (after increasing the chunksize ).

multiprocessing smoothes input so that it can be passed to separate and independent python processes. This means that you are making a copy of the entire array for each i that you are working on.

We will return to this question a little ...


multiprocessing Does not share memory between processes

The multiprocessing module works by etching input data and transferring it to independent processes. This means that if you change something in one process, the other process will not see the change.

However, multiprocessing also provides primitives that live in shared memory and can be accessed and modified by child processes. There are several different ways to adapt numpy arrays to use shared memory multiprocessing.Array . However, I would recommend avoiding them first (read the false exchange if you are not familiar with it). There are times when it is very useful, but, as a rule, to save memory, and not to improve performance.

Therefore, it is best to make all the changes in a large array in one process (this is also a very useful template for general I / O). It should not be the β€œcore” process, but the easiest way to think about it.

As an example, suppose we want our parallel_rolling_mean function to accept an output array for storing things. A useful template is similar to the following. Pay attention to using iterators and changing output only in the main process:

 import numpy as np import multiprocessing def parallel_rolling_mean(data, window, output): def windows(data, window): padded = np.pad(data, window, mode='edge') for i in xrange(len(data)): yield padded[i:i + 2*window + 1] pool = multiprocessing.Pool() results = pool.imap(np.mean, windows(data, window)) for i, result in enumerate(results): output[i] = result pool.close() foo = np.random.rand(20000000).cumsum() output = np.zeros_like(foo) parallel_rolling_mean(foo, 10, output) print output 

Hope this example helps clarify things a bit.


chunksize and performance

One quick note on performance: if we scale it, it will be very slow very fast. If you look at the system monitor (for example, top / htop ), you may notice that your cores are in standby mode most of the time.

By default, the master process sorts each input for each process and transfers it immediately, and then waits until they finish parsing the next input. In many cases, this means that the master process is running and then idle while the workflows are busy, then the workflows sit idle while the main process traces the next input.

The key is to increase the chunksize parameter. This will cause pool.imap β€œpre-sort” the specified number of inputs for each process. In principle, the main thread may remain busy with the tracer input, and workflows may remain busy with processing. The downside is that you use more memory. If each input uses a large amount of RAM, this might be a bad idea. If this is not the case, it can significantly speed up the process.

As a quick example:

 import numpy as np import multiprocessing def parallel_rolling_mean(data, window, output): def windows(data, window): padded = np.pad(data, window, mode='edge') for i in xrange(len(data)): yield padded[i:i + 2*window + 1] pool = multiprocessing.Pool() results = pool.imap(np.mean, windows(data, window), chunksize=1000) for i, result in enumerate(results): output[i] = result pool.close() foo = np.random.rand(2000000).cumsum() output = np.zeros_like(foo) parallel_rolling_mean(foo, 10, output) print output 

Using chunksize=1000 it takes 21 seconds to process an array of 2 million elements:

 python ~/parallel_rolling_mean.py 83.53s user 1.12s system 401% cpu 21.087 total 

But with chunksize=1 (default), it takes about eight times (2 minutes, 41 seconds).

 python ~/parallel_rolling_mean.py 358.26s user 53.40s system 246% cpu 2:47.09 total 

In fact, with the default chunksize, this is actually much worse than a single-processor implementation of the same, which only takes 45 seconds:

 python ~/sequential_rolling_mean.py 45.11s user 0.06s system 99% cpu 45.187 total 
+7
source

All Articles