How to summarize a loop in parallel using multiprocessing in Python

I find it difficult to understand how to use the Python multiprocessing module.

I have a sum from 1 to n , where n=10^10 , which is too large to fit into the list, which seems to be the push of many examples on the Internet using multiprocessing.

Is there a way to β€œsplit” a range into segments of a certain size and then do the sum for each segment?

for instance

 def sum_nums(low,high): result = 0 for i in range(low,high+1): result += i return result 

And I want to calculate sum_nums(1,10**10) , breaking it into many sum_nums(1,1000) + sum_nums(1001,2000) + sum_nums(2001,3000)... and so on. I know there is a close form of n(n+1)/2 , but pretend we don't know that.

Here is what I tried

 import multiprocessing def sum_nums(low,high): result = 0 for i in range(low,high+1): result += i return result if __name__ == "__main__": n = 1000 procs = 2 sizeSegment = n/procs jobs = [] for i in range(0, procs): process = multiprocessing.Process(target=sum_nums, args=(i*sizeSegment+1, (i+1)*sizeSegment)) jobs.append(process) for j in jobs: j.start() for j in jobs: j.join() #where is the result? 
+5
source share
3 answers

First, the best way to get around the memory issue is to use an iterator / generator list instead:

 def sum_nums(low, high): result = 0 for i in xrange(low, high+1): result += 1 return result 

in python3, range () creates an iterator, so this is only needed in python2

Now that the multiprocessor arrives, you want to split the processing into different processes or processor cores. If you do not need to control individual workers, then the easiest way is to use a process pool. This will allow you to map the function to the pool and get the result. You can also use apply_async to apply tasks to the pool one at a time and get a delayed result, which you can get with .get() :

 import multiprocessing from multiprocessing import Pool from time import time def sum_nums(low, high): result = 0 for i in xrange(low, high+1): result += i return result # map requires a function to handle a single argument def sn((low,high)): return sum_nums(low, high) if __name__ == '__main__': #t = time() # takes forever #print sum_nums(1,10**10) #print '{} s'.format(time() -t) p = Pool(4) n = int(1e8) r = range(0,10**10+1,n) results = [] # using apply_async t = time() for arg in zip([x+1 for x in r],r[1:]): results.append(p.apply_async(sum_nums, arg)) # wait for results print sum(res.get() for res in results) print '{} s'.format(time() -t) # using process pool t = time() print sum(p.map(sn, zip([x+1 for x in r], r[1:]))) print '{} s'.format(time() -t) 

On my machine, just calling sum_nums with 10 ** 10 takes almost 9 minutes, but using Pool(8) and n=int(1e8) reduces this to just over a minute.

+1
source

I find using multiprocess.Pool and map () much easier

Using your code:

 from multiprocessing import Pool def sum_nums(args): low = int(args[0]) high = int(args[1]) return sum(range(low,high+1)) if __name__ == "__main__": n = 1000 procs = 2 sizeSegment = n/procs # Create size segments list jobs = [] for i in range(0, procs): jobs.append((i*sizeSegment+1, (i+1)*sizeSegment)) pool = Pool(procs).map(sum_nums, jobs) result = sum(pool) >>> print result >>> 500500 
+1
source

You can make this amount without multiprocessing at all, and it might be easier, if not faster, to just use generators.

 # prepare a generator of generators each at 1000 point intervals >>> xr = (xrange(1000*i+1,i*1000+1001) for i in xrange(10000000)) >>> list(xr)[:3] [xrange(1, 1001), xrange(1001, 2001), xrange(2001, 3001)] # sum, using two map functions >>> xr = (xrange(1000*i+1,i*1000+1001) for i in xrange(10000000)) >>> sum(map(sum, map(lambda x:x, xr))) 50000000005000000000L 

However, if you want to use multiprocessing , you can also do this. I use the multiprocessing fork, which is better at serialization (but otherwise, not quite different).

 >>> xr = (xrange(1000*i+1,i*1000+1001) for i in xrange(10000000)) >>> import pathos >>> mmap = pathos.multiprocessing.ProcessingPool().map >>> tmap = pathos.multiprocessing.ThreadingPool().map >>> sum(tmap(sum, mmap(lambda x:x, xr))) 50000000005000000000L 

The w / o multiprocessing version is faster and takes about a minute on my laptop. The multiprocessing version takes a few minutes due to the overhead of creating many python processes.

If you're interested, get pathos here: https://github.com/uqfoundation

+1
source

All Articles