How to parallelize sum calculation in python numpy?

I have a sum that I am trying to calculate, and I am having difficulty parallelizing the code. The calculation I'm trying to parallelize is quite complicated (it uses both numpy arrays and scipy sparse matrices). It splashes out a numpy array, and I want to sum the output arrays from about 1000 calculations. Ideally, I would keep the current amount for all iterations. However, I could not figure out how to do this.

So far, I have been trying to use the joblib parallel function and the pool.map function with the python multiprocessing package. For both of them, I use an internal function that returns a numpy array. These functions return a list, which I convert to a numpy array and then summarize.

However, after the joblib Parallel function completes all iterations, the main program will never work (it seems that the original job is in a standby state using 0% CPU). When I use pool.map, I get memory errors after all iterations are complete.

Is there a way to just parallelize the current sum of arrays?

Change The goal is to do something like the following, except in parallel.

def summers(num_iters): sumArr = np.zeros((1,512*512)) #initialize sum for index in range(num_iters): sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array return sumArr 
+3
python numpy scipy parallel-processing sum
source share
2 answers

I figured out how to parallelize the sum of arrays with multiprocessing, apply_async and callbacks, so I am posting this here for other people. I used the sample Parallel Python page for the Sum callback class, although I did not actually use this package for implementation. However, this gave me the opportunity to use callbacks. Here's a simplified code for what I used, and it does what I wanted it to do.

 import multiprocessing import numpy as np import thread class Sum: #again, this class is from ParallelPython example code (I modified for an array and added comments) def __init__(self): self.value = np.zeros((1,512*512)) #this is the initialization of the sum self.lock = thread.allocate_lock() self.count = 0 def add(self,value): self.count += 1 self.lock.acquire() #lock so sum is correct if two processes return at same time self.value += value #the actual summation self.lock.release() def computation(index): array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes return array1 def summers(num_iters): pool = multiprocessing.Pool(processes=8) sumArr = Sum() #create an instance of callback class and zero the sum for index in range(num_iters): singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add) pool.close() pool.join() #waits for all the processes to finish return sumArr.value 

I also managed to get this work using the parallel map that was suggested in another answer. I have tried this before, but I have not implemented it correctly. Both methods work, and I think this answer explains the problem of which method to use (map or apply.async) is pretty good. For the map version, you do not need to define the Sum class, and the summers function becomes

 def summers(num_iters): pool = multiprocessing.Pool(processes=8) outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these sumArr = np.zeros((1,512*512)) #but I do to make sure I have the memory outputArr = np.array(pool.map(computation, range(num_iters))) sumArr = outputArr.sum(0) pool.close() #not sure if this is still needed since map waits for all iterations return sumArr 
+5
source share

I'm not sure I understand the problem. Are you just trying to split the list into a pool of workers, make them keep the current amount of their calculations and summarize the result?

 #!/bin/env python import sys import random import time import multiprocessing import numpy as np numpows = 5 numitems = 25 nprocs = 4 def expensiveComputation( i ): time.sleep( random.random() * 10 ) return np.array([i**j for j in range(numpows)]) def listsum( l ): sum = np.zeros_like(l[0]) for item in l: sum = sum + item return sum def partition(lst, n): division = len(lst) / float(n) return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ] def myRunningSum( l ): sum = np.zeros(numpows) for item in l: sum = sum + expensiveComputation(item) return sum if __name__ == '__main__': random.seed(1) data = range(numitems) pool = multiprocessing.Pool(processes=4,) calculations = pool.map(myRunningSum, partition(data,nprocs)) print 'Answer is:', listsum(calculations) print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.]) 

(section function coming from Python: splitting a list into n sections of almost equal length )

+1
source share

All Articles