Using Pool.map multiprocessing with lambdas

Customization: I have a function preprocess(data, predicate)and a predicate list that might look like this:

preds = [lambda x: x < 1,
         lambda x: x < 2,
         lambda x: x < 3,
         lambda x: x < 42]

Strike> EDIT: I probably should have been more accurate because I thought that 1, 2, 3, 42 could obviously be identified as examples, but it seemed like it was too implicit. In fact, I am doing a few NLPs, and data- lists of words, and one predicate looks like lambda w: (w.lower() not in stopwords.words('english') and re.search("[a-z]", w.lower())). I want to check out various predicates to evaluate which ones are best suited.

This is what I really want to do. Call preprocesswith each predicate in parallel.

EDIT: since this is a preprocessing step, I need what is returned preprocessto continue working with it.

What I was hoping I could do, but unfortunately I can't:

pool = Pool(processes=4)
pool.map(lambda p: preprocess(data, p), preds)

As I understand it, this is because everything that was passed to pool.map should be legible. There are two solutions to this question , of which the first (accepted answer) seems impractical, and secound does not seem to work in Python 2.7, which I use, although I suggested that it makes into a python metaphor in the comments.

My question is: is pool.map the right way and if so, how to do it? Or shoud do I use a different approach?

I know that there are many questions regarding pool.map, and although I spent some time searching, I did not find an answer. Also, if my code style is awkward, feel free to specify. I read that it lambdalooks strange to some and that I should probably use functools.partial.

Thanks in advance.

+4
3

preprocess, threshold. - :

def preprocess(data, threshold):
    def predicate(x):
        return x < threshold
    return old_preprocess(data, predicate)

preds , picklable:

preds = [1,2,3,42]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))

, , operator:

def preprocess(data, pred):
    threshold, op = pred
    def predicate(x):
        return op(x, threshold)
    return old_preprocess(data, predicate)

import operator as op
preds = [(1, op.lt), (2, op.gt), (3, op.ge), (42, op.lt)]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))

, . , - marshal, bytes .

- :

real_preds = [marshal.dumps(pred.__code__) for pred in preds]

preprocess :

import types

def preprocess(data, pred):
    pred = types.FunctionType(marshal.loads(pred), globals())

MWE :

>>> from multiprocessing import Pool
>>> import marshal
>>> import types
>>> def preprocess(pred):
...     pred = types.FunctionType(marshal.loads(pred), globals())
...     return pred(2)
... 
>>> preds = [lambda x: x < 1,
...          lambda x: x <2,
...          lambda x: x < 3,
...          lambda x: x < 42]
>>> real_preds = [marshal.dumps(pred.__code__) for pred in preds]
>>> pool = Pool(processes=4)
>>> pool.map(preprocess, real_preds)
[False, False, True, True]

, pool.map . , lambda pool.map:

>>> pool.map(lambda x: preprocess(x), real_preds)
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python3.3/threading.py", line 639, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.3/threading.py", line 596, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.3/multiprocessing/pool.py", line 351, in _handle_tasks
    put(task)
  File "/usr/lib/python3.3/multiprocessing/connection.py", line 206, in send
    ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

"is pool.map ? , . , " ", , , , " " :

lambda w: (w.lower() not in stopwords.words('english') and re.search("[a-z]", w.lower()))

, , pool.map. , w .

, , pool.map , w 35000 . w 1000, Pool 15 map ( 256 . 60000, Pool ).

, w , lambda def w.lower(). map pool.map.

+3

Pool.map, . :

result = map(function, things)

result = []
for thing in things:
    result.append(function(thing))

, ,

result = [function(thing) for thing in things]

, ( ) :

def mapme(bound):
    p = lambda x : x < bound
    return preprocess(data, p)

, . preprocess , .

+2

If you use functions for your side effects and shouldn't use unified output pool.map(), you can just mimic it using os.fork()(at least on unix-like systems).

You can try something like this:

import numpy as np
import os
nprocs=4
funcs=np.array_split(np.array(preds),nprocs)
#Forks the program into nprocs programs, each with a procid from 0 to nprocs-1
procid=0
for x in range(1,nprocs):
    if (os.fork()==0):
        procid=x
        break
map(lambda p: preprocess(data, p), funcs[procid])
0
source

All Articles