RabbitMQ asynchronous consumer with aioamqp

I am trying to write an asynchronous user using asyncio / aioamqp. My problem is that the callback coprocessor (below) is blocking. I set the channel to execute basic_consume () and assigned the callback as callback (). The callback has the instruction "exit asyncio.sleep" (to simulate "work"), which takes an integer from the publisher and sleeps during this time before printing the message.

If I published two messages, one with a time of "10", immediately followed by one with a time of "1", I expected the second message to be printed first, since it has a shorter sleep time. Instead, the callback block within 10 seconds prints the first message and then prints the second.

Either basic_consume or a callback appears, is blocked somewhere. Is there any other way to handle this?

@asyncio.coroutine def callback(body, envelope, properties): yield from asyncio.sleep(int(body)) print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) @asyncio.coroutine def receive_log(): try: transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password") except: print("closed connections") return channel = yield from protocol.channel() exchange_name = 'cloudstack-events' exchange_name = 'test-async-exchange' queue_name = 'async-queue-%s' % random.randint(0, 10000) yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False) yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10) binding_keys = ['mykey'] for binding_key in binding_keys: print("binding", binding_key) yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, queue_name=queue_name, routing_key=binding_key), timeout=10) print(' [*] Waiting for logs. To exit press CTRL+C') yield from channel.basic_consume(queue_name, callback=callback) loop = asyncio.get_event_loop() loop.create_task(receive_log()) loop.run_forever() 
+7
python asynchronous python-asyncio rabbitmq
source share
1 answer

For those who are interested, I figured out how to do it. I'm not sure if this is the best practice, but it does what I need.

Instead of doing the β€œwork” (in this case async.sleep) inside the callback, I create a new task in a loop and plan a separate routine to run do_work (). Presumably this works because it frees callback () to return immediately.

I loaded several hundred events into Rabbit with different sleep timers, and they were interleaved when they were printed using the code below. It seems to work. Hope this helps someone!

 @asyncio.coroutine def do_work(envelope, body): yield from asyncio.sleep(int(body)) print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) @asyncio.coroutine def callback(body, envelope, properties): loop = asyncio.get_event_loop() loop.create_task(do_work(envelope, body)) @asyncio.coroutine def receive_log(): try: transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password") except: print("closed connections") return channel = yield from protocol.channel() exchange_name = 'cloudstack-events' exchange_name = 'test-async-exchange' queue_name = 'async-queue-%s' % random.randint(0, 10000) yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False) yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10) binding_keys = ['mykey'] for binding_key in binding_keys: print("binding", binding_key) yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, queue_name=queue_name, routing_key=binding_key), timeout=10) print(' [*] Waiting for logs. To exit press CTRL+C') yield from channel.basic_consume(queue_name, callback=callback) loop = asyncio.get_event_loop() loop.create_task(receive_log()) loop.run_forever() 
+4
source share

All Articles