Python 2.6 multiprocessing.Queue compatible with threads?

I am experimenting with a new multiprocessing module in Python 2.6. I create several processes each with its own multiprocessor .JoinableQueue instance. Each process generates one or more worker threads (subclasses of threading.Thread) that share an JoinableQueue instance (passed through each Thread __init__ method). It usually works, but sometimes fails unpredictably with the following error:

  File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run self.queue.task_done() File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done raise ValueError('task_done() called too many times') ValueError: task_done() called too many times 

My queues get () and task_done () immediately after each other, so they should be equal. Anecdotally, this happens only when the work performed between get () and task_done () is VERY fast. Inserting a little time.sleep(0.01) seems to ease the problem.

Any ideas what is going on? Can I use a multiprocessor queue with threads instead of the more traditional (Queue.Queue)?

Thanks!

-Brian

+7
python multiprocessing
source share
4 answers

I have not experimented with multi-processing in version 2.6 yet, but I have played a lot with pyprocessing (as it was called in 2.5).

I see that you are looking for several processes, each of which creates a set of threads respectively.

Since you are using a multi-processor module, I suggest using a multi-processor rather than multi-threaded approach, you will encounter smaller problems, such as deadlocks, etc.

Create a queue object. http://pyprocessing.berlios.de/doc/queue-objects.html

To create a multiprocess environment, use the pool: http://pyprocessing.berlios.de/doc/pool-objects.html , which will manage the workflow for you. Then you can apply asynchronous / synchronous work with workers and you can also add a callback for each worker, if necessary. But remember that the callback is a common code block and should return immediately (as indicated in the documentation).

Additional information: If necessary, create the http://pyprocessing.berlios.de/doc/manager-objects.html manager to control access to the queue object. To do this, you have to make the queue object shared. But the advantage is that after sharing and management, you can access this shared queue throughout the network by creating proxy objects. This will allow you to call the methods of a centralized shared queue object as (apparently) native methods on any node network.

Here is a sample code from the documentation

You can start the manager server on one computer and use it by clients from other computers (provided that the firewalls involved allow this). Running the following commands creates a server for a shared queue that remote clients can use:

 >>> from processing.managers import BaseManager, CreatorMethod >>> import Queue >>> queue = Queue.Queue() >>> class QueueManager(BaseManager): ... get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy') ... >>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none') >>> m.serve_forever() 

One client can access the server as follows:

 >>> from processing.managers import BaseManager, CreatorMethod >>> class QueueManager(BaseManager): ... get_proxy = CreatorMethod(typeid='get_proxy') ... >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none') >>> queue = m.get_proxy() >>> queue.put('hello') 

If you insist on secure streaming stuff, PEP371 (multiprocessing) refers to this http://code.google.com/p/python-safethread/

+4
source share

You must pass Queue objects as the target arguments.

Example from multiprocessor documentation :

 from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join() 

Queues are streaming and secure processes.

+2
source share

You may have encountered this error:

http://bugs.python.org/issue4660

+1
source share

Thanks for the quick response. I pass instances of multiprocessing.Queue as arguments for each process, as you illustrate. An error occurs in threads. I create them by subclassing threading.Thread and passing the queue to the init method for each thread instance. This seems to be an acceptable way to pass subclass streams to Queues. I just thought that multiprocessor queues might not be thread compatible (although they are supposedly thread safe).

-one
source share

All Articles