If you do not want to use another library, you can schedule a coroutine from the stream. Replacing queue.put_nowait as follows is fine.
asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
The loop variable represents the event loop in the main thread.
EDIT:
The reason your async coroutine is not doing anything is because the event loop never gives it the ability to do this. The queue object is not thread safe, and if you dig up cpython code, you will find that this means that put_nowait wakes up the queue consumers by using the future using the call_soon method of the call_soon loop. If we could use his call_soon_threadsafe , it should work. The main difference between call_soon and call_soon_threadsafe , however, is that call_soon_threadsafe wakes up the event loop by calling loop._write_to_self() . Therefore, allow yourself:
import asyncio import threading queue = asyncio.Queue() def threaded(): import time while True: time.sleep(2) queue.put_nowait(time.time()) queue._loop._write_to_self() print(queue.qsize()) @asyncio.coroutine def async(): while True: time = yield from queue.get() print(time) loop = asyncio.get_event_loop() asyncio.Task(async()) threading.Thread(target=threaded).start() loop.run_forever()
Then everything works as expected.
Regarding the thread-safe aspect, access to shared objects, asyncio.queue uses collections.deque , which has append and popleft streams, under the hood. Perhaps the queue check is not empty, and popleft is not atomic, but if you consume the queue in only one thread (one from the event loop) this may be good.
Other suggested solutions, loop.call_soon_threadsafe by Huazuo Answer Gao and my asyncio.run_coroutine_threadsafe just do this, waking up the event loop.
cronos
source share