I am trying to write an asynchronous user using asyncio / aioamqp. My problem is that the callback coprocessor (below) is blocking. I set the channel to execute basic_consume () and assigned the callback as callback (). The callback has the instruction "exit asyncio.sleep" (to simulate "work"), which takes an integer from the publisher and sleeps during this time before printing the message.
If I published two messages, one with a time of "10", immediately followed by one with a time of "1", I expected the second message to be printed first, since it has a shorter sleep time. Instead, the callback block within 10 seconds prints the first message and then prints the second.
Either basic_consume or a callback appears, is blocked somewhere. Is there any other way to handle this?
@asyncio.coroutine def callback(body, envelope, properties): yield from asyncio.sleep(int(body)) print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) @asyncio.coroutine def receive_log(): try: transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password") except: print("closed connections") return channel = yield from protocol.channel() exchange_name = 'cloudstack-events' exchange_name = 'test-async-exchange' queue_name = 'async-queue-%s' % random.randint(0, 10000) yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False) yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10) binding_keys = ['mykey'] for binding_key in binding_keys: print("binding", binding_key) yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, queue_name=queue_name, routing_key=binding_key), timeout=10) print(' [*] Waiting for logs. To exit press CTRL+C') yield from channel.basic_consume(queue_name, callback=callback) loop = asyncio.get_event_loop() loop.create_task(receive_log()) loop.run_forever()
python asynchronous python-asyncio rabbitmq
blindsnowmobile
source share