Using Tornado with Pika to Monitor Asynchronous Queue

I have an AMQP server ( RabbitMQ ) that I would like to publish and read a web server in Tornado . For this, I decided that I would use the python amqp asynchronous library; in particular Pika (an option that Tornado allegedly supports).

I wrote code that seems to be successfully read from the queue, except that at the end of the request I get an exception (the browser returns a fine):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1) HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'}) Traceback (most recent call last): File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context yield File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext yield File "/usr/lib/python2.6/contextlib.py", line 113, in nested yield vars File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped callback(*args, **kwargs) File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events self._handle_read() File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read self.on_data_available(chunk) File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available self.channels[frame.channel_number].frame_handler(frame) KeyError: 1 

I'm not quite sure that I am using this library correctly, so I can do something egregious. The main stream of my code:

  • Request comes in
  • Create a connection to RabbitMQ using TornadoConnection; specify callback
  • In the connection callback, create a channel, declare / bind my queue, and call basic_consume; specify callback
  • When calling a callback, close the channel and call the Tornado termination function.
  • See exception.

My questions are several:

  • Is this thread right? I am not sure what the purpose of the callback of the connection is, except that it does not work if I do not use it.
  • Should I create one AMQP connection for each web request? The RabbitMQ documentation suggests that no, I shouldn't, but I should stick to creating only channels. But where should I create the connection and how will I try to reconnect if it drops briefly?
  • If I create one AMQP connection for each web request, where should I close it? The call to amqp.close () in my callback seems to hurt even more.

I will try a little sample code a bit later, but the steps described above completely consume some of the consumption. I also have problems with the publishing side, but queue consumption is more relevant.

+8
python asynchronous tornado rabbitmq amqp
source share
2 answers

This will help to see some source code, but I use the same pika tornado-supporting module without problems in more than one project.

You do not want to create a connection for each request. Create a class that wraps all your AMQP operations and instantiates it as a singleton at the tornado application level, which can be used for queries (and for request handlers). I do this in the "runapp ()" function, which does some similar things and then launches the main ioloop tornado.

Here is a class called "Events". This is a partial implementation (in particular, I do not define "self.handle_event" here. This is for you.

 class Event(object): def __init__(self, config): self.host = 'localhost' self.port = '5672' self.vhost = '/' self.user = 'foo' self.exchange = 'myx' self.queue = 'myq' self.recv_routing_key = 'msgs4me' self.passwd = 'bar' self.connected = False self.connect() def connect(self): credentials = pika.PlainCredentials(self.user, self.passwd) parameters = pika.ConnectionParameters(host = self.host, port = self.port, virtual_host = self.vhost, credentials = credentials) srs = pika.connection.SimpleReconnectionStrategy() logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host, self.port)) self.connection = tornado_adapter.TornadoConnection(parameters, wait_for_open = False, reconnection_strategy = srs, callback = self.on_connected) def on_connected(self): # Open the channel logging.debug("Events: Opening a channel") self.channel = self.connection.channel() # Declare our exchange logging.debug("Events: Declaring the %s exchange" % self.exchange) self.channel.exchange_declare(exchange = self.exchange, type = "fanout", auto_delete = False, durable = True) # Declare our queue for this process logging.debug("Events: Declaring the %s queue" % self.queue) self.channel.queue_declare(queue = self.queue, auto_delete = False, exclusive = False, durable = True) # Bind to the exchange self.channel.queue_bind(exchange = self.exchange, queue = self.queue, routing_key = self.recv_routing_key) self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True) # We should be connected if we made it this far self.connected = True 

And then I put this in a file called "events.py". My RequestHandlers and any end-of-code code use the common.py module, which wraps code useful for both (my RequestHandlers do not call any amqp module methods directly - the same goes for db, cache, etc.), so I define 'events = None 'at the module level in common.py, and I instantiate an Event object like this:

 import events def runapp(config): if myapp.common.events is None: myapp.common.events = myapp.events.Event(config) logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events) http_server = tornado.httpserver.HTTPServer(app, xheaders=config['HTTPServer']['xheaders'], no_keep_alive=config['HTTPServer']['no_keep_alive']) http_server.listen(port) main_loop = tornado.ioloop.IOLoop.instance() logging.debug("MAIN IOLOOP: %s", main_loop) main_loop.start() 

Happy New Year: -D

+8
source share

Someone reported successes in combining Tornado and Pika here . From what I can say, it is not as simple as calling Pika from Tornado, as both libraries want to have their own event loops.

0
source share

Source: https://habr.com/ru/post/650286/


All Articles