Warning / Disclaimer:
You might not want to apply multiprocessing to this problem. You will find that relatively simple operations with large arrays, problems will be related to memory using numpy . The bottleneck is moving data from RAM to processor caches. The processor is starving for data, so throwing more processors into the problem does not help much. In addition, your current approach will pickle and make a copy of the entire array for each element in your input sequence, which will add a lot of overhead.
There are many cases where numpy + multiprocessing very efficient, but you need to make sure that you are working with a processor related issue. Ideally, this is a problem with CPUs with relatively small inputs and outputs, in order to ease the overhead of etching input and output. For many problems for which numpy is most commonly used, this is not the case.
Two problems with your current approach
In answer to your question:
Your immediate mistake is related to the transfer of a function inaccessible from the global scope (i.e. a function defined inside a function).
However, you have one more problem. You process numpy arrays as if they were shared memory that could be modified by each process. Instead, when using multiprocessing original array will be pickled (effectively making a copy) and transferred to each process independently. The original array will never be modified.
Avoidance PicklingError
As a minimal example to reproduce your error, consider the following:
import multiprocessing def apply_parallel(input_sequence): def func(x): pass pool = multiprocessing.Pool() pool.map(func, input_sequence) pool.close() foo = range(100) apply_parallel(foo)
This will lead to:
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Of course, in this simple example, we could just move the function definition back to the __main__ namespace. However, in yours, you need it to refer to the transferred data. Take a look at an example that is a little closer to what you are doing:
import numpy as np import multiprocessing def parallel_rolling_mean(data, window): data = np.pad(data, window, mode='edge') ind = np.arange(len(data)) + window def func(i): return data[i-window:i+window+1].mean() pool = multiprocessing.Pool() result = pool.map(func, ind) pool.close() return result foo = np.random.rand(20).cumsum() result = parallel_rolling_mean(foo, 10)
There are several ways to handle this, but the general approach is something like:
import numpy as np import multiprocessing class RollingMean(object): def __init__(self, data, window): self.data = np.pad(data, window, mode='edge') self.window = window def __call__(self, i): start = i - self.window stop = i + self.window + 1 return self.data[start:stop].mean() def parallel_rolling_mean(data, window): func = RollingMean(data, window) ind = np.arange(len(data)) + window pool = multiprocessing.Pool() result = pool.map(func, ind) pool.close() return result foo = np.random.rand(20).cumsum() result = parallel_rolling_mean(foo, 10)
Excellent! He works!
However, if you scale the scale to large arrays, you will soon find that it will either work very slowly (which you can speed up by increasing the chunksize in the pool.map call), or you will quickly start (after increasing the chunksize ).
multiprocessing smoothes input so that it can be passed to separate and independent python processes. This means that you are making a copy of the entire array for each i that you are working on.
We will return to this question a little ...
multiprocessing Does not share memory between processes
The multiprocessing module works by etching input data and transferring it to independent processes. This means that if you change something in one process, the other process will not see the change.
However, multiprocessing also provides primitives that live in shared memory and can be accessed and modified by child processes. There are several different ways to adapt numpy arrays to use shared memory multiprocessing.Array . However, I would recommend avoiding them first (read the false exchange if you are not familiar with it). There are times when it is very useful, but, as a rule, to save memory, and not to improve performance.
Therefore, it is best to make all the changes in a large array in one process (this is also a very useful template for general I / O). It should not be the βcoreβ process, but the easiest way to think about it.
As an example, suppose we want our parallel_rolling_mean function to accept an output array for storing things. A useful template is similar to the following. Pay attention to using iterators and changing output only in the main process:
import numpy as np import multiprocessing def parallel_rolling_mean(data, window, output): def windows(data, window): padded = np.pad(data, window, mode='edge') for i in xrange(len(data)): yield padded[i:i + 2*window + 1] pool = multiprocessing.Pool() results = pool.imap(np.mean, windows(data, window)) for i, result in enumerate(results): output[i] = result pool.close() foo = np.random.rand(20000000).cumsum() output = np.zeros_like(foo) parallel_rolling_mean(foo, 10, output) print output
Hope this example helps clarify things a bit.
chunksize and performance
One quick note on performance: if we scale it, it will be very slow very fast. If you look at the system monitor (for example, top / htop ), you may notice that your cores are in standby mode most of the time.
By default, the master process sorts each input for each process and transfers it immediately, and then waits until they finish parsing the next input. In many cases, this means that the master process is running and then idle while the workflows are busy, then the workflows sit idle while the main process traces the next input.
The key is to increase the chunksize parameter. This will cause pool.imap βpre-sortβ the specified number of inputs for each process. In principle, the main thread may remain busy with the tracer input, and workflows may remain busy with processing. The downside is that you use more memory. If each input uses a large amount of RAM, this might be a bad idea. If this is not the case, it can significantly speed up the process.
As a quick example:
import numpy as np import multiprocessing def parallel_rolling_mean(data, window, output): def windows(data, window): padded = np.pad(data, window, mode='edge') for i in xrange(len(data)): yield padded[i:i + 2*window + 1] pool = multiprocessing.Pool() results = pool.imap(np.mean, windows(data, window), chunksize=1000) for i, result in enumerate(results): output[i] = result pool.close() foo = np.random.rand(2000000).cumsum() output = np.zeros_like(foo) parallel_rolling_mean(foo, 10, output) print output
Using chunksize=1000 it takes 21 seconds to process an array of 2 million elements:
python ~/parallel_rolling_mean.py 83.53s user 1.12s system 401% cpu 21.087 total
But with chunksize=1 (default), it takes about eight times (2 minutes, 41 seconds).
python ~/parallel_rolling_mean.py 358.26s user 53.40s system 246% cpu 2:47.09 total
In fact, with the default chunksize, this is actually much worse than a single-processor implementation of the same, which only takes 45 seconds:
python ~/sequential_rolling_mean.py 45.11s user 0.06s system 99% cpu 45.187 total