Is there a way to use asyncio.Queue in multiple threads?

Suppose I have the following code:

import asyncio import threading queue = asyncio.Queue() def threaded(): import time while True: time.sleep(2) queue.put_nowait(time.time()) print(queue.qsize()) @asyncio.coroutine def async(): while True: time = yield from queue.get() print(time) loop = asyncio.get_event_loop() asyncio.Task(async()) threading.Thread(target=threaded).start() loop.run_forever() 

The problem with this code is that the loop inside async coroutine never finishes the first iteration, and the queue size increases.

Why is this happening and what can I do to fix it?

I cannot get rid of a separate stream, because in my real code I use a separate stream to communicate with a serial device, and I did not find a way to do this using asyncio .

+7
python python-asyncio
source share
3 answers

asyncio.Queue not thread safe , so you cannot use it directly from multiple threads. Instead, you can use janus , which is a third-party library that provides an asyncio chain that supports the stream:

 import asyncio import threading import janus def threaded(squeue): import time while True: time.sleep(2) squeue.put_nowait(time.time()) print(squeue.qsize()) @asyncio.coroutine def async(aqueue): while True: time = yield from aqueue.get() print(time) loop = asyncio.get_event_loop() queue = janus.Queue(loop=loop) asyncio.Task(asyncio.ensure_future(queue.async_q)) threading.Thread(target=threaded, args=(queue.sync_q,)).start() loop.run_forever() 

There is also aioprocessing (full disclosure: I wrote it) that provides a safe process (and, effect, thread safe) for the queue, but it is overkill if you are not trying to use multiprocessing .

+15
source share

BaseEventLoop.call_soon_threadsafe . See asyncio doc for asyncio .

Just modify threaded() as follows:

 def threaded(): import time while True: time.sleep(1) loop.call_soon_threadsafe(queue.put_nowait, time.time()) loop.call_soon_threadsafe(lambda: print(queue.qsize())) 

Here's an example output:

 0 1443857763.3355968 0 1443857764.3368602 0 1443857765.338082 0 1443857766.3392274 0 1443857767.3403943 
+2
source share

If you do not want to use another library, you can schedule a coroutine from the stream. Replacing queue.put_nowait as follows is fine.

 asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop) 

The loop variable represents the event loop in the main thread.

EDIT:

The reason your async coroutine is not doing anything is because the event loop never gives it the ability to do this. The queue object is not thread safe, and if you dig up cpython code, you will find that this means that put_nowait wakes up the queue consumers by using the future using the call_soon method of the call_soon loop. If we could use his call_soon_threadsafe , it should work. The main difference between call_soon and call_soon_threadsafe , however, is that call_soon_threadsafe wakes up the event loop by calling loop._write_to_self() . Therefore, allow yourself:

 import asyncio import threading queue = asyncio.Queue() def threaded(): import time while True: time.sleep(2) queue.put_nowait(time.time()) queue._loop._write_to_self() print(queue.qsize()) @asyncio.coroutine def async(): while True: time = yield from queue.get() print(time) loop = asyncio.get_event_loop() asyncio.Task(async()) threading.Thread(target=threaded).start() loop.run_forever() 

Then everything works as expected.

Regarding the thread-safe aspect, access to shared objects, asyncio.queue uses collections.deque , which has append and popleft streams, under the hood. Perhaps the queue check is not empty, and popleft is not atomic, but if you consume the queue in only one thread (one from the event loop) this may be good.

Other suggested solutions, loop.call_soon_threadsafe by Huazuo Answer Gao and my asyncio.run_coroutine_threadsafe just do this, waking up the event loop.

+1
source share

All Articles