Adding state to a function called through pool.map - how to avoid etching errors

I ran into the general problem of getting a brine error when using the multiprocessing module.

My specific problem is that I need to provide a function, I call some state before I call it in the pool.map function, but at the same time I cause the attribute lookup __builtin__.function failed error found.

Based on the linked SO answer, it seems like the only way to use the function in pool.map is to call the most specific function so that it pool.map beyond the current function.

It seems to me that I explained this poorly, so here is the problem in the code. :)

Testing without a pool

 # Function to be called by the multiprocessing pool def my_func(x): massive_list, medium_list, index1, index2 = x result = [massive_list[index1 + x][index2:] for x in xrange(10)] return result in medium_list if __name__ == '__main__': data = [comprehension which loads a ton of state] source = [comprehension which also loads a medium amount of state] for num in range(100): to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) result = map(my_func, to_crunch) 

This works A-OK and exactly as expected. The only thing that is "wrong" with him is that he is slow.

Pool Attempt 1

 # (Note: my_func() remains the same) if __name__ == '__main__': data = [comprehension which loads a ton of state] source = [comprehension which also loads a medium amount of state] pool = multiprocessing.Pool(2) for num in range(100): to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) result = pool.map(my_func, to_crunch) 

It technically works, but it's a terrific 18x slower! Slowing should occur not only when copying two massive data structures during each call, but also when tracing / unpacking them as they progress. A version other than the pool can only be used to pass a link to a massive list, not to the actual list.

So, having tracked down the bottleneck, I am trying to save two massive lists as state inside my_func . Thus, if I understand correctly, it will need to be copied only once for each worker (in my case 4).

Pool Attempt 2:

I end my_func in a closure, passing in two lists as a saved state.

 def build_myfunc(m,s): def my_func(x): massive_list = m # close the state in there small_list = s index1, index2 = x result = [massive_list[index1 + x][index2:] for x in xrange(10)] return result in medium_list return my_func if __name__ == '__main__': data = [comprehension which loads a ton of state] source = [comprehension which also loads a medium amount of state] modified_func = build_myfunc(data, source) pool = multiprocessing.Pool(2) for num in range(100): to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) result = pool.map(modified_func, to_crunch) 

However, this returns a brine error, because (based on the above SO question) you cannot call a function with multiprocessing from within the same area.

Error:

 PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

So, is there a way to solve this problem?

+2
source share
1 answer

A map is a way to distribute workload. If you store data in func, I think you are losing the original purpose.

Try to find why it is slower. This is not normal, and there must be something else.

First, the number of processes must be suitable for the machine on which they work. In your example, you are using a pool of two processes, so a total of 3 processes are involved. How many cores are in the system you are using? What else works? What does the system load when the data crunch? What does the function do with the data? Disk access? Or perhaps it uses a database, which means that there is probably another process to access the disk and the kernels. What about memory? Is it enough to keep the source lists?

The correct implementation is your attempt 1.

Try profiling execution with iostat , for example. This way you can spot bottlenecks.

If it stops on the processor, you can try some changes in the code.

From another answer on Stackoverflow (I have nothing to copy and paste here: P):

You use .map() , which collect the results and then return. Therefore, for a large data set, you are probably stuck in the collection phase.

You can try using .imap() , which is a version of the iterator on .map() or even .imap_unordered() , if the order of the results is not important (as it seems from your example).

Here is the relevant documentation. It is worth noting the line:

For very long iterations that use a lot of importance for chunksize, you can do the job much faster than using the default value of 1.

0
source

All Articles