Changing various python objects in parallel processes, respectively

In a nutshell

I want to modify complex python objects at a time when each object is processed by only one process. How can I do this (most efficiently)? Will there be any help in etching help? Will it be effective?

Complete problem

I have a python ArrayDict data structure that basically consists of a numpy array and a dictionary and maps arbitrary indexes to strings in the array. In my case, all keys are integers.

 a = ArrayDict() a[1234] = 12.5 a[10] = 3 print(a[1234]) #12.5 print(a[10]) # 3.0 print(a[1234] == a.array[a.indexDict[1234]]) #true 

Now I have several such ArrayDict and you want to fill them in myMethod(arrayDict, params) . Since myMethod expensive, I want to run it in parallel. Note that myMethod can add many lines to an ArrayDict . Each process changes its own ArrayDict . I do not need simultaneous access to ArrayDict s.

In myMethod I change the entries in the ArrayDict (i.e. change the numpy internal array), add entries to the ArrayDict (that is, add another index to the dictionary and write a new value in the internal array). In the end, I would like to be able to exchange an ArrayDict numpy internal array when it gets too small. This happens infrequently, and I can perform this action in the non-parallel part of my program if there is no better solution. My own attempts were not successful even without exchanging arrays.

I spent several days studying shared memory and python multiprocessing . Since I will finally be working on linux, the task seemed pretty simple: the fork() system call allows you to work effectively with copies of the arguments. My thought was to modify each ArrayDict in its own process, return a modified version of the object, and overwrite the original object. To save memory and save work for copying, I used additional sharedmem arrays to store data in ArrayDict . I know that the dictionary should still be copied.

 from sharedmem import sharedmem import numpy as np n = ... # length of the data array myData = np.empty(n, dtype=object) myData[:] = [ArrayDict() for _ in range(n)] done = False while not done: consideredData = ... # numpy boolean array of length # n with True at the index of # considered data args = ... # numpy array containing arguments # for myMethod with sharedmem.MapReduce() as pool: results = pool.map(myMethod, list(zip(myData[considered], args[considered])), star=True) myData[considered] = results done = ... # depends on what happens in # myMethod 

What I get is a segmentation error error. I was able to get around this error by creating deepcopies from ArrayDict to myMethod and saved them in myData . I really don’t understand why this is necessary, and often copying my (potentially very large) arrays (while loop takes a lot of time) is not so effective for me. However, at least it worked to a certain extent. However, my program has some errors on the 3rd iteration due to shared memory. Therefore, I believe that my path is not optimal.

I read here and here that it is possible to store aribtrary numpy arrays in shared memory using multiprocessing.Array . However, I still need to share the entire ArrayDict , which includes, in particular, a dictionary, which, in turn, is not matched.

How could I effectively achieve my goals? Would it be possible (and effective) to make my object selectable somehow?

All solutions should work with support for python 3 and full support for numpy / scipy on 64-bit Linux.

Edit

I found here that somehow you can share arbitrary objects using the Multiprocessing "Manager" classes and custom proxy classes. Will it be effective? I would like to use that I do not need simultaneous access to objects, although they are not processed in the main process. Will it be possible to create a manager for each object that I want to process? (Perhaps I still have some misconceptions about how manipulators work.)

+7
python shared-memory fork pickle python-multiprocessing
source share
1 answer

This seems like a rather complicated class, and I cannot fully expect whether this solution will work in your case. A simple compromise for such a complex class is to use the ProcessPoolExecutor .

If this does not answer your question, then it will be fine with a minimal working example.

 from concurrent.futures import ProcessPoolExecutor import numpy as np class ArrayDict (): keys = None vals = None def __init__ (self): self.keys = dict () self.vals = np.random.rand (1000) def __str__ (self): return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean()) def myMethod (ad, args): print ("starting:", ad) if __name__ == '__main__': l = [ArrayDict() for _ in range (5)] args = [2, 3, 4, 1, 3] with ProcessPoolExecutor (max_workers = 2) as ex: d = ex.map (myMethod, l, args) 

Objects are cloned when sent to the child process, you need to return the result (since changes to the object will not be distributed back to the main process) and how you want to save them.

Note that changes to class variables will propagate to other objects in the same process, for example. if you have more tasks than processes, changes to the class of variables will be shared by instances running in the same process. This is usually an undesirable behavior.

This is a high-level interface for parallelization. ProcessPoolExecutor uses the multiprocessing module and can only be used with selectable objects . I suspect that the ProcessPoolExecutor has a performance similar to the "sharing status between processes . " Under the hood, ProcessPoolExecutor uses multiprocessing.Process and should have similar performance as Pool (except when using very long iterations with the map). ProcessPoolExecutor seems to be the intended future API for concurrent tasks in python.

If you can, it is usually faster to use ThreadPoolExecutor (which you can simply swap for ProcessPoolExecutor ). In this case, the object is distributed between the processes, and updating to one will be distributed back to the main thread.

As mentioned, the fastest option is probably restructuring ArrayDict , so that it only uses objects that can be represented by multiprocessing.Value or Array .

If ProcessPoolExecutor not working and you cannot optimize ArrayDict , you can get stuck with Manager . There are good examples of how to do this here .

The greatest performance enhancement is often found in myMethod . And, as I mentioned, the overhead of using threads is less than that of processes.

+3
source share

All Articles