Sharing numpy arrays in python multiprocessing pool

I am working on some code that does pretty hard numerical work on a lot (from tens to hundreds of thousands of numerical integrations) a lot of problems. Fortunately, these integrations are awkwardly parallel, so it’s easy to use Pool.map () to split work across multiple cores.

Now I have a program that has this basic workflow:

#!/usr/bin/env python from multiprocessing import Pool from scipy import * from my_parser import parse_numpy_array from my_project import heavy_computation #X is a global multidimensional numpy array X = parse_numpy_array("input.dat") param_1 = 0.0168 param_2 = 1.505 def do_work(arg): return heavy_computation(X, param_1, param_2, arg) if __name__=='__main__': pool = Pool() arglist = linspace(0.0,1.0,100) results = Pool.map(do_work,arglist) #save results in a .npy file for analysis save("Results", [X,results]) 

Since X, param_1, and param_2 are hardcoded and initialized in exactly the same way for each process in the pool, this all works great. Now that my code is working for me, I would like to make the file name, param_1 and param_2 entered by the user at runtime, rather than hard-coded.

It should be noted that X, param_1 and param_2 do not change as work progresses. Since I do not change them, I could do something like this at the beginning of the program:

 import sys X = parse_numpy_array(sys.argv[1]) param_1 = float(sys.argv[2]) param_2 = float(sys.argv[3]) 

And this will do the trick, but since most users of this code run the code from Windows computers, I would rather not follow the path of the command line arguments.

What I would like to do is something like this:

 X, param_1, param_2 = None, None, None def init(x,p1, p2) X = x param_1 = p1 param_2 = p2 if __name__=='__main__': filename = raw_input("Filename> ") param_1 = float(raw_input("Parameter 1: ")) param_2 = float(raw_input("Parameter 2: ")) X = parse_numpy_array(filename) pool = Pool(initializer = init, initargs = (X, param_1, param_2,)) arglist = linspace(0.0,1.0,100) results = Pool.map(do_work,arglist) #save results in a .npy file for analysis save("Results", [X,results]) 

But of course this fails, and X / param_1 / param_2 is still None when the call to pool.map occurs. I'm new to multiprocessing, so I'm not sure why the initializer call fails. Is there a way to do what I want to do? Is there a better way to do this? I also looked at using shared data, but from my understanding of documentation that only works with ctypes that don't include numpy arrays. Any help with this would be greatly appreciated.

+7
source share
2 answers

I had a similar problem. If you just want to read my solution, skip some lines :) I had to:

  • swap numpy.array between threads working in another part and ...
  • pass Pool.map a function with more than one argument.

I noticed that:

  • numpy.array data was read correctly, but ...
  • changes to numpy.array where no constant is produced
  • Pool.map had problems processing lambda functions, or so it seemed to me (if this moment is not clear to you, just ignore it)

My solution was:

  • make the target function just a list argument
  • make the objective function return the changed data instead of a direct attempt to write to numpy.array

I understand that your do_work function already returns the calculated data, so you just need to change to_work to take a list (containing X, param_1, param_2 and arg) as an argument and pack the input into the target function in this before passing it in Pool.map.

Here is an example implementation:

 def do_work2(args): X,param_1,param_2,arg = args return heavy_computation(X, param_1, param_2, arg) 

Now you need to pack the input into the do_work function before calling it. Your main thing to become:

 if __name__=='__main__': filename = raw_input("Filename> ") param_1 = float(raw_input("Parameter 1: ")) param_2 = float(raw_input("Parameter 2: ")) X = parse_numpy_array(filename) # now you pack the input arguments arglist = [[X,param1,param2,n] for n in linspace(0.0,1.0,100)] # consider that you're not making 100 copies of X here. You're just passing a reference to it results = Pool.map(do_work2,arglist) #save results in a .npy file for analysis save("Results", [X,results]) 
+3
source

For your last idea to work, I can just make the global variables X , param_1 and param_2 using the global keyword before changing them inside the if statement. Therefore add the following:

 global X global param_1 global param_2 

immediately after if __name__ == '__main__' .

-2
source

All Articles