Unable to define static method - Multiprocessing - Python

I apply some parallelization to the code in which I use classes. I knew that it was not possible to choose a class method without any other approach than what Python provides. I found a solution here . In my code, I have parts that need to be parallelized using the class. Here I am posting a very simple code, simply representing the structure of mine (the same thing, but I deleted the contents of the methods, which was a lot of mathematical calculus, insignificant for the output I get). The problem is that I can choose one method (shepard_interpolation), but with another (calculate_orientation_uncertainty) I got a pickle error. I do not know why this is happening, or why it is partially working.

def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names cls_name = cls.__name__.lstrip('_') func_name = '_' + cls_name + func_name print cls return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): for cls in cls.__mro__: try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) class ImageData(object): def __init__(self, width=60, height=60): self.width = width self.height = height self.data = [] for i in range(width): self.data.append([0] * height) def shepard_interpolation(self, seeds=20): print "ImD - Sucess" import copy_reg import types from itertools import product from multiprocessing import Pool copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class VariabilityOfGradients(object): def __init__(self): pass @staticmethod def aux(): return "VoG - Sucess" @staticmethod def calculate_orientation_uncertainty(): results = [] pool = Pool() for x, y in product(range(1, 5), range(1, 5)): result = pool.apply_async(VariabilityOfGradients.aux) results.append(result.get()) pool.close() pool.join() if __name__ == '__main__': results = [] pool = Pool() for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) pool.close() pool.join() VariabilityOfGradients.calculate_orientation_uncertainty() 

On startup, I got "PicklingError: Can not pickle: attribute search built-in .function failed". And it's almost the same thing found here . The only difference that I see is that my methods are static.

EDIT:

I noticed that in my result = pool.apply_async(VariabilityOfGradients.aux()) calculation, when I call the function as result = pool.apply_async(VariabilityOfGradients.aux()) , i.e. with a parenthesis (I've never seen this in document examples), it works. But, when I try to get the result, I get a TypeError object: 'int' do not call "...

Any help would be greatly appreciated. Thank you in advance.

+8
python class multiprocessing pickle pool
source share
3 answers

You can define a simple function at the module level and a static method. This preserves the syntax functions of invocation, introspection, and inheritance of the static method, while avoiding etching problems:

 def aux(): return "VoG - Sucess" class VariabilityOfGradients(object): aux = staticmethod(aux) 

For example,

 import copy_reg import types from itertools import product import multiprocessing as mp def _pickle_method(method): """ Author: Steven Bethard (author of argparse) http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods """ func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class cls_name = '' if func_name.startswith('__') and not func_name.endswith('__'): cls_name = cls.__name__.lstrip('_') if cls_name: func_name = '_' + cls_name + func_name return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): """ Author: Steven Bethard http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods """ for cls in cls.mro(): try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class ImageData(object): def __init__(self, width=60, height=60): self.width = width self.height = height self.data = [] for i in range(width): self.data.append([0] * height) def shepard_interpolation(self, seeds=20): print "ImD - Success" def aux(): return "VoG - Sucess" class VariabilityOfGradients(object): aux = staticmethod(aux) @staticmethod def calculate_orientation_uncertainty(): pool = mp.Pool() results = [] for x, y in product(range(1, 5), range(1, 5)): # result = pool.apply_async(aux) # this works too result = pool.apply_async(VariabilityOfGradients.aux, callback=results.append) pool.close() pool.join() print(results) if __name__ == '__main__': results = [] pool = mp.Pool() for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) pool.close() pool.join() VariabilityOfGradients.calculate_orientation_uncertainty() 

gives

 ImD - Success ImD - Success ImD - Success ['VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess'] 

By the way, result.get () blocks the calling process until the function called pool.apply_async (for example, ImageData.shepard_interpolation ). So

 for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) 

does call ImageData.shepard_interpolation sequentially, violating the purpose of the pool.

You can use instead

 for _ in range(3): pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()], callback=results.append) 

A callback function (e.g. results.append ) is called in the thread of the calling process when the function is completed. It is sent one argument - the return value of the function. Thus, nothing blocks the execution of three calls to pool.apply_async , and the work performed by three calls to ImageData.shepard_interpolation will be performed simultaneously.

Alternatively, it is easiest to use pool.map here.

 results = pool.map(ImageData.shepard_interpolation, [ImageData()]*3) 
+9
source share

If you use a multiprocessing fork called pathos.multiprocesssing , you can directly use the classes and methods of the class in map multiprocessor functions. This is because cPickle used instead of pickle or cPickle , and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function ... and it can map with multiple arguments (for example, map(math.pow, [1,2,3], [4,5,6]) )

See: What can multiprocessing and dill do?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> >>> p = Pool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> p.map(t.plus, x, y) [4, 6, 8, 10] 

Get the code here: https://github.com/uqfoundation/pathos

pathos also has asynchronous mapping ( amap ) as well as imap .

+5
source share

I'm not sure if this is what you are looking for, but my use was a little different. I wanted to use a method from a class in the same class running in multiple threads.

Here's how I implemented it:

 from multiprocessing import Pool class Product(object): def __init__(self): self.logger = "test" def f(self, x): print(self.logger) return x*x def multi(self): p = Pool(5) print(p.starmap(Product.f, [(Product(), 1), (Product(), 2), (Product(), 3)])) if __name__ == '__main__': obj = Product() obj.multi() 
0
source share

All Articles