Simple (less reliable) way
Turn on trusted queues first:
channel.queue_declare(queue='your_queue', durable=True)
both for the publisher and for the consumer (before publication / consumption).
Then you can be sure that your queue will not be lost, even if the RabbitMQ server dies and restarts.
Publisher
At the publisher, add properties=pika.BasicProperties(delivery_mode=2) to your basic_publish call to make sure your posts are persistent.
channel.basic_publish(exchange=self.output_exchange_name, routing_key=routing_key, body=message_body.strip(), properties=pika.BasicProperties(delivery_mode=2))
This should do the trick so as not to lose _published messages.
Consumer
From a consumer perspective, the official RabbitMQ official python tutorial reads:
To ensure that a message is never lost, RabbitMQ supports message acknowledgment. Ack (nowledgement) is sent back from the consumer to inform RabbitMQ that a specific message has been received, processed and that RabbitMQ can delete it. [...] By default, messages are acknowledged.
When you build a consumer, make sure you send ack correctly so that RabbitMQ removes it from the queue.
def callback(ch, method, properties, body): print "Received %r" % (body,) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='your_queue')
Truly Safe Way
If you need a stronger and more reliable method to be completely confident in publishing relay confirmation to RabbitMQ, you should use the plublish confirm function of the AMQP protocol.
From the pika documentation :
import pika
So, according to your code, I will use something similar to:
count=0 for json_string in open(json_file, 'r'): result_json = json.loads(json_string) message_body = json.dumps(result_json['body']) routing_key = result_json['RoutingKey'] if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(), properties=pika.BasicProperties(delivery_mode=2)): # Make it persistent count += 1 else: # Do something with your undelivered message self.logger.info('Sent %d messages' % count) connection.close()
Or as a brute force method, you can use the while instead of if to ensure all your messages are sent:
count = 0 for json_string in open(json_file, 'r'): result_json = json.loads(json_string) message_body = json.dumps(result_json['body']) routing_key = result_json['RoutingKey'] while not channel.basic_publish(exchange=self.output_exchange_name, routing_key=routing_key, body=message_body.strip(), properties=pika.BasicProperties(delivery_mode=2)): pass