How to connect to RabbitMQ?

My python script should constantly send messages to RabbitMQ after receiving one from another data source. The frequency of sending a python script can vary, say, 1 minute - 30 minutes.

Here I establish a connection with RabbitMQ:

rabt_conn = pika.BlockingConnection(pika.ConnectionParameters("some_host")) channel = rbt_conn.channel() 

I just got an exception

 pika.exceptions.ConnectionClosed 

How can i connect to it? What is the best way? Is there a "strategy"? Is it possible to send pins to maintain a network connection or set a timeout?

Any pointers would be appreciated.

+6
source share
2 answers

RabbitMQ uses heartbeats to detect and close dead connections and to prevent network devices (firewalls, etc.) from stopping connection downtime. Starting with version 3.5.5, the default timeout is set to 60 seconds (previously it was ~ 10 minutes). From docs :

Heartbeat frames are sent every timeout / 2 seconds. After two missed heart contractions, peers are considered unreachable.

The problem with Pika BlockingConnection is that it cannot respond to the heartbeat until any API call is made (e.g. channel.basic_publish() , connection.sleep() , etc.).

The approaches I have found so far are:

Increase or Deactivate Timeout

RabbitMQ negotiates a timeout with the client when establishing a connection. Theoretically, it should be possible to override the default server value with a larger one using the heartbeat_interval argument, but the current version of Pika (0.10.0) uses the min value between those offered by the server and the client. This issue has been fixed for the current wizard .

On the other hand, you can completely disable heart rate functionality by setting the heartbeat_interval argument to 0 , which may well lead to new problems (firewalls, drop connections, etc.)

reconnecting

By deploying the answer to @itsafire, you can write your own publisher class, letting you connect again when necessary. An example of a naive implementation:

 import logging import json import pika class Publisher: EXCHANGE='my_exchange' TYPE='topic' ROUTING_KEY = 'some_routing_key' def __init__(self, host, virtual_host, username, password): self._params = pika.connection.ConnectionParameters( host=host, virtual_host=virtual_host, credentials=pika.credentials.PlainCredentials(username, password)) self._conn = None self._channel = None def connect(self): if not self._conn or self._conn.is_closed: self._conn = pika.BlockingConnection(self._params) self._channel = self._conn.channel() self._channel.exchange_declare(exchange=self.EXCHANGE, type=self.TYPE) def _publish(self, msg): self._channel.basic_publish(exchange=self.EXCHANGE, routing_key=self.ROUTING_KEY, body=json.dumps(msg).encode()) logging.debug('message sent: %s', msg) def publish(self, msg): """Publish msg, reconnecting if necessary.""" try: self._publish(msg) except pika.exceptions.ConnectionClosed: logging.debug('reconnecting to queue') self.connect() self._publish(msg) def close(self): if self._conn and self._conn.is_open: logging.debug('closing queue connection') self._conn.close() 

Other features

Other possibilities that I have not explored yet:

+6
source

Dead simple: some template like this.

 import time while True: try: communication_handles = connect_pika() do_your_stuff(communication_handles) except pika.exceptions.ConnectionClosed: print 'oops. lost connection. trying to reconnect.' # avoid rapid reconnection on longer RMQ server outage time.sleep(0.5) 

You may have to refactor your code, but the main thing is to catch the exception, mitigate the problem, and keep doing your stuff. communication_handles contains all pika elements, such as channels, queues, and everything you need to communicate with RabbitMQ through pika.

+5
source

All Articles