RabbitMq - pika - python - Delete posts when posting

def get_connection_and_channel(self, connection_parameters): connection = pika.BlockingConnection(connection_parameters) channel = connection.channel() return (connection, channel) connection_parameters = pika.ConnectionParameters( server, port, virtual_host, credentials=pika.PlainCredentials(user_name, password)) connection,channel = self.get_connection_and_channel(connection_parameters) channel.confirm_delivery() count=0 for json_string in open(json_file, 'r'): result_json = json.loads(json_string) message_body = json.dumps(result_json['body']) routing_key = result_json['RoutingKey'] channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip()) count += 1 self.logger.info('Sent %d messages' % count) connection.close() 

I use this code to send messages to the RabbitMQ server. But from time to time it does not send all messages to the appropriate queue. It skips a random number of messages each time it starts.

I can’t understand what the problem is.

+5
source share
3 answers

Your messages are likely to be returned because they cannot redirect the message to an existing queue. Try adding a callback to channel.confirm_delivery :

 channel.confirm_delivery(on_delivery_confirmation) def on_delivery_confirmation(self, method_frame): confirmation_type = method_frame.method.NAME.split('.')[1].lower() if confirmation_type == 'ack': self.logger.info('message published') elif confirmation_type == 'nack': self.logger.info('message not routed') 

If so, try to first bind the consumer queue with the exchange and routing key before posting the message.

+3
source

Simple (less reliable) way

Turn on trusted queues first:

 channel.queue_declare(queue='your_queue', durable=True) 

both for the publisher and for the consumer (before publication / consumption).

Then you can be sure that your queue will not be lost, even if the RabbitMQ server dies and restarts.

Publisher

At the publisher, add properties=pika.BasicProperties(delivery_mode=2) to your basic_publish call to make sure your posts are persistent.

 channel.basic_publish(exchange=self.output_exchange_name, routing_key=routing_key, body=message_body.strip(), properties=pika.BasicProperties(delivery_mode=2)) 

This should do the trick so as not to lose _published messages.

Consumer

From a consumer perspective, the official RabbitMQ official python tutorial reads:

To ensure that a message is never lost, RabbitMQ supports message acknowledgment. Ack (nowledgement) is sent back from the consumer to inform RabbitMQ that a specific message has been received, processed and that RabbitMQ can delete it. [...] By default, messages are acknowledged.

When you build a consumer, make sure you send ack correctly so that RabbitMQ removes it from the queue.

 def callback(ch, method, properties, body): print "Received %r" % (body,) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='your_queue') 

Truly Safe Way

If you need a stronger and more reliable method to be completely confident in publishing relay confirmation to RabbitMQ, you should use the plublish confirm function of the AMQP protocol.

From the pika documentation :

 import pika # Open a connection to RabbitMQ on localhost using all default parameters connection = pika.BlockingConnection() # Open the channel channel = connection.channel() # Declare the queue channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False) # Turn on delivery confirmations channel.confirm_delivery() # Send a message if channel.basic_publish(exchange='test', routing_key='test', body='Hello World!', properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)): print 'Message publish was confirmed' else: print 'Message could not be confirmed' 

So, according to your code, I will use something similar to:

 count=0 for json_string in open(json_file, 'r'): result_json = json.loads(json_string) message_body = json.dumps(result_json['body']) routing_key = result_json['RoutingKey'] if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(), properties=pika.BasicProperties(delivery_mode=2)): # Make it persistent count += 1 else: # Do something with your undelivered message self.logger.info('Sent %d messages' % count) connection.close() 

Or as a brute force method, you can use the while instead of if to ensure all your messages are sent:

 count = 0 for json_string in open(json_file, 'r'): result_json = json.loads(json_string) message_body = json.dumps(result_json['body']) routing_key = result_json['RoutingKey'] while not channel.basic_publish(exchange=self.output_exchange_name, routing_key=routing_key, body=message_body.strip(), properties=pika.BasicProperties(delivery_mode=2)): pass # Do nothing or even you can count retries count += 1 self.logger.info('Sent %d messages' % count) 
+1
source

try this to get only one message with your command:

 #!/usr/bin/env python import pika import ujson as json def receive(): parameters = pika.ConnectionParameters(host='localhost') connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='raw_post', durable=True) method_frame, header_frame, body = channel.basic_get(queue='raw_post') if method_frame.NAME == 'Basic.GetEmpty': connection.close() return '' else: channel.basic_ack(delivery_tag=method_frame.delivery_tag) connection.close() return json.loads(body), method_frame.message_count a = '' while a not in ['quit', 'sair', 'exit', 'bye']: a = input("whats up?") print(receive()) 

just a sender with 5000 messages for the queue:

 #!/usr/bin/env python import pika import ujson as json connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='raw_post', durable=True) for i in range(5000): info = {"info": "test", "value": i} channel.basic_publish(exchange='', routing_key='raw_post', body=json.dumps(info), properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!' {}".format(i)) connection.close() 
0
source

All Articles