Basic multiprocessing with while loop

I'm new to python's multiprocessing package, and my confusion will probably be easy for those who know more to clear things up. I read about concurrency and searched for other questions like this and found nothing. (FYI I do not want to use multithreading because GIL will slow down my application very much.)

I think in the context of events. I want to start several processes, waiting for an event. If an event occurs, it is assigned to a specific process that is running, and then returns to the idle state. There may be a better way to do this, but my reasoning is that I should spawn all processes once and keep them open indefinitely, and not create and then close the process every time an event occurs. Speed ​​is a problem for me, and my events can occur many thousands of times per second.

I came up with the following toy example, which is designed to send even numbers to one process and odd numbers to another. Both processes are the same, they just add a number to the list.

 from multiprocessing import Process, Queue, Pipe slist=['even','odd'] Q={} Q['even'] = Queue() Q['odd'] = Queue() ev,od = [],[] Q['even'].put(ev) Q['odd'].put(od) P={} P['even'] = Pipe() P['odd'] = Pipe() def add_num(s): """ The worker function, invoked in a process. The results are placed in a list that pushed to a queue.""" # while True : if not P[s][1].recv(): print s,'- do nothing' else: d = Q[s].get() print d d.append(P[s][1].recv()) Q[s].put(d) print Q[s].get() P[s][0].send(False) print 'ya' def piper(s,n): P[s][0].send(n) for k in [S for S in slist if S != s]: P[k][0].send(False) add_num(s) procs = [ Process ( target=add_num, args=(i,) ) for i in ['even','odd']] for s in slist: P[s][0].send(False) for p in procs: p.start() p.join() for i in range(10): print i if i%2==0: s = 'even' else: s = 'odd' piper(s,i) print 'results:', Q['odd'].get(),Q['even'].get() 

This code creates the following:

even - do nothing

Any understanding of the wise in this problem, where my code or argument is not suitable, etc., would be greatly appreciated.

+4
source share
1 answer

Here is an approach that I used a couple of times with good success:

  • Launch the multiprocessor pool .

  • Use SyncManager multiprocessing to create multiple queues (one for each type of data that needs to be processed differently).

  • Use apply_async to run functions that process data. Just like queues, there must be one function for each data type, which must be handled differently. Each running function receives a queue corresponding to its data as an input argument. Functions will do their job in an endless loop that starts with getting data from the queue.

  • Begin processing. During processing, the main process sorts the data and decides which function should process it. After making the decision, the data is placed in the queue corresponding to this function.

  • After all the data has been processed, the main process places a value called a "poison pill" in each queue. A poisonous pill is a value that the worker treats as a signal for everyone to exit. Since the queues are the first in the first (FIFO), then they are guaranteed to pull out the poison pill as the last item in the queues.

  • Close and attach to the multiprocessor pool.

the code

The following is an example of this algorithm. The purpose of the code example is to use the previously described algorithm to divide odd numbers by 2 and even numbers by -2. All results are placed in a general list, accessible by the main process.

 import multiprocessing POISON_PILL = "STOP" def process_odds(in_queue, shared_list): while True: # block until something is placed on the queue new_value = in_queue.get() # check to see if we just got the poison pill if new_value == POISON_PILL: break # we didn't, so do the processing and put the result in the # shared data structure shared_list.append(new_value/2) return def process_evens(in_queue, shared_list): while True: new_value = in_queue.get() if new_value == POISON_PILL: break shared_list.append(new_value/-2) return def main(): # create a manager - it lets us share native Python object types like # lists and dictionaries without worrying about synchronization - # the manager will take care of it manager = multiprocessing.Manager() # now using the manager, create our shared data structures odd_queue = manager.Queue() even_queue = manager.Queue() shared_list = manager.list() # lastly, create our pool of workers - this spawns the processes, # but they don't start actually doing anything yet pool = multiprocessing.Pool() # now we'll assign two functions to the pool for them to run - # one to handle even numbers, one to handle odd numbers odd_result = pool.apply_async(process_odds, (odd_queue, shared_list)) even_result = pool.apply_async(process_evens, (even_queue, shared_list)) # this code doesn't do anything with the odd_result and even_result # variables, but you have the flexibility to check exit codes # and other such things if you want - see docs for AsyncResult objects # now that the processes are running and waiting for their queues # to have something, lets give them some work to do by iterating # over our data, deciding who should process it, and putting it in # their queue for i in range(6): if (i % 2) == 0: # use mod operator to see if "i" is even even_queue.put(i) else: odd_queue.put(i) # now we've finished giving the processes their work, so send the # poison pill to tell them to exit even_queue.put(POISON_PILL) odd_queue.put(POISON_PILL) # wait for them to exit pool.close() pool.join() # now we can check the results print(shared_list) # ...and exit! return if __name__ == "__main__": main() 

Exit

This code produces this output:

[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]

Please note that the order of the results is unpredictable, because we cannot guarantee in which order the functions can receive elements from their queues and put the results in a list. But you can certainly use any subsequent processing, which may include sorting.

Justification

I think this will be a good solution to your problem, because:

  • You are right that the huge overhead of spawning processes. This single-producer / multiple-consumer approach eliminates this when you use the pool to keep employees working throughout the program.

  • It addresses your concerns regarding the ability to process data differently depending on data attributes. In your comments, you expressed concern about the possibility of sending data to certain processes. In this approach, you can choose which processes should provide data, because you need to choose in which queue to put it. (By the way, I think you are thinking of the pool.map function, which, as you correctly assume, does not allow you to perform different operations in the same task. apply_async does.)

  • I found it to be very extensible and flexible. Need to add more types of data processing? Just write your handler function, add another queue and add logic to the main place to redirect the data to a new function. Do you find that one queue gets a backup and becomes a bottleneck? You can call apply_async with the same target function and queue multiple times to make several workers work in the same queue. Just make sure you give the queue enough poisonous pills so that all workers get one.

Limitations

Any data that you want to queue must be sorted (serializable) by the brine module. Look here to find out what can and cannot be pickled.

There are probably other limitations, but I can't think of anyone out of my head.

+10
source

All Articles