RabbitMQ uses heartbeats to detect and close dead connections and to prevent network devices (firewalls, etc.) from stopping connection downtime. Starting with version 3.5.5, the default timeout is set to 60 seconds (previously it was ~ 10 minutes). From docs :
Heartbeat frames are sent every timeout / 2 seconds. After two missed heart contractions, peers are considered unreachable.
The problem with Pika BlockingConnection is that it cannot respond to the heartbeat until any API call is made (e.g. channel.basic_publish() , connection.sleep() , etc.).
The approaches I have found so far are:
Increase or Deactivate Timeout
RabbitMQ negotiates a timeout with the client when establishing a connection. Theoretically, it should be possible to override the default server value with a larger one using the heartbeat_interval argument, but the current version of Pika (0.10.0) uses the min value between those offered by the server and the client. This issue has been fixed for the current wizard .
On the other hand, you can completely disable heart rate functionality by setting the heartbeat_interval argument to 0 , which may well lead to new problems (firewalls, drop connections, etc.)
reconnecting
By deploying the answer to @itsafire, you can write your own publisher class, letting you connect again when necessary. An example of a naive implementation:
import logging import json import pika class Publisher: EXCHANGE='my_exchange' TYPE='topic' ROUTING_KEY = 'some_routing_key' def __init__(self, host, virtual_host, username, password): self._params = pika.connection.ConnectionParameters( host=host, virtual_host=virtual_host, credentials=pika.credentials.PlainCredentials(username, password)) self._conn = None self._channel = None def connect(self): if not self._conn or self._conn.is_closed: self._conn = pika.BlockingConnection(self._params) self._channel = self._conn.channel() self._channel.exchange_declare(exchange=self.EXCHANGE, type=self.TYPE) def _publish(self, msg): self._channel.basic_publish(exchange=self.EXCHANGE, routing_key=self.ROUTING_KEY, body=json.dumps(msg).encode()) logging.debug('message sent: %s', msg) def publish(self, msg): """Publish msg, reconnecting if necessary.""" try: self._publish(msg) except pika.exceptions.ConnectionClosed: logging.debug('reconnecting to queue') self.connect() self._publish(msg) def close(self): if self._conn and self._conn.is_open: logging.debug('closing queue connection') self._conn.close()
Other features
Other possibilities that I have not explored yet: