I spent several hours on various attempts to parallelize my code with a number crunch, but at the same time I get slower. Unfortunately, the problem disappears when I try to reduce it to the example below, and I really do not want to publish the whole program here. Therefore, the question arises: what errors should be avoided in this type of program?
(Note: continued after Unutbu's answer below.)
Here are the circumstances:
- This is about a module that defines a class
BigDatawith lots of internal data. The example has one list ffof interpolation functions; in the actual program more than, for example, ffA[k], ffB[k], ffC[k]. - The calculation will be classified as an "embarrassing parallel": work can be performed on small pieces of data at a time. In this example
do_chunk(). - The approach shown in the example would lead to the worst performance in my real program: about 1 second per piece (0.1 second or so of the actual calculation time when it was done in one thread). So, for n = 50, it
do_single()will work after 5 seconds and do_multi()will work after 55 seconds. - I also tried to divide the work, breaking the array
xiand yithe adjacent blocks, and iterate through all the values kin each fragment. This has improved a bit. Now there was no difference in the overall runtime, whether I used 1, 2, 3 or 4 threads. But of course I want to see the actual acceleration! - : Multiprocessing.Pool Numpy. , : ( ),
def do_chunk(array1, array2, array3), numpy . . - , (300% ).
import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
sum += np.sum(procs[k].get(timeout=30))
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum
def _do_chunk_wrapper(bd, k, xi, yi):
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
:
Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
Intel Core i3-3227 2 , 4 , 64- Linux. ( , ) 10 , .
Unutbu . self 37 140 , . , ; , , . , apply_async Linux ; ( ) 0,2 apply_async/get. , . , . .
dict; , . dict . /IPC , .
import numpy as np, sys
from multiprocessing import Pool
_mproc_data = {}
class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)
def do_chunk(self, k, xi, yi):
zi = k*np.ones_like(xi)
return zi
def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks)
procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")
zi = np.zeros_like(xi)
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30)
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)
( , 2 , 4 ), ( xi, yi, zi ). " ", . "1 " do_chunk - .
#Proc 125K 250K 500K 1000K unlimited
1 0.82
2 4.28 1.96 1.3 1.31
3 2.69 1.06 1.06 1.07
4 2.17 1.27 1.23 1.28
. 3 L3, - L2 256 . , BigData. , , . 2 , 4, 3 - .