RabbitMQ: Permanent Topic Exchange Post

I am very new to RabbitMQ.

I have established a topic exchange. Consumers can be launched after the publisher. I would like consumers to be able to receive messages that were sent before they appeared, and which have not yet been used.

The exchange is configured with the following parameters:

exchange_type => 'topic' durable => 1 auto_delete => 0 passive => 0 

Messages are published using this parameter:

 delivery_mode => 2 

Consumers use get () to retrieve messages from the exchange.

Unfortunately, any message posted before any customer was lost is lost. I used different combinations.

I think my problem is that the exchange does not contain messages. Maybe I need to have a queue between the publisher and the queue. But this does not seem to work with the exchange of "topics", where messages are sent using the key.

Any idea how I should act. I use the Net :: RabbitMQ Perl binding (doesn't matter) and RabbitMQ 2.2.0.

+51
rabbitmq amqp
May 27 '11 at 5:57 a.m.
source share
2 answers

You need a solid queue for storing messages if there are no connected consumers available to process messages at the time of publication.

An exchange does not save messages, but may be in a queue. The confusing part is that the exchange may be marked as β€œdurable”, but all that really means is that the exchange itself will still be present if you restart your broker, but it does not mean that any messages sent that the exchange is automatically saved.

Given this, here are two options:

  • Complete the administrative step before starting the publishers to create the queue (s) yourself. You can use the web interface or command line tools for this. Make sure you create it as a solid queue so that it saves any messages that have been redirected to it, even if there are no active users.
  • Assuming that your consumers are encoded to always announce (and therefore automatically create) their exchanges and queues at startup (and declare them as long-lived), simply launch all their consumers at least once before starting with any publishers. This will ensure that all your queues are created correctly. Then you can disconnect consumers until they are really needed, because the queues will constantly store any future messages sent to them.

I would go to # 1. There cannot be many steps to complete, and you can always script to complete the necessary steps so that they can be repeated. Plus, if all your consumers will be pulling from the same queue (and not have a dedicated queue), this is really the minimum part of the administrative overhead.

Queues are what you need to control and control. Otherwise, you may end up rogue consumers declaring strong lines, using them within minutes, but never again. Soon after you have a constantly growing line, nothing will reduce its size and the impending broker's apocalypse.

+50
May 27 '11 at 17:29
source share

As Brian mentioned, an exchange does not store messages and is mainly responsible for routing messages to another exchange / s or queue / s. If the exchange is not associated with the queue, all messages sent to this exchange will be "lost"

You do not need to declare persistent client queues in the script publisher, as this may not be scalable. Queues can be dynamically created by your publishers and routed internally using exchange-to-exchange bindings.

RabbitMQ supports exchange-to-exchange bindings that allow flexibility, topology decoupling, and other benefits. You can read here RabbitMQ Exchange for Exchange binding [AMPQ]

RabbitMQ Exchange Link Exchange

Example topology

Python sample code for creating an exchange binding to an exchange with persistence if the consumer is not present using the queue.

 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #Declares the entry exchange to be used by all producers to send messages. Could be external producers as well channel.exchange_declare(exchange='data_gateway', exchange_type='fanout', durable=True, auto_delete=False) #Declares the processing exchange to be used.Routes messages to various queues. For internal use only channel.exchange_declare(exchange='data_distributor', exchange_type='topic', durable=True, auto_delete=False) #Binds the external/producer facing exchange to the internal exchange channel.exchange_bind(destination='data_distributor',source='data_gateway') ##Create Durable Queues binded to the data_distributor exchange channel.queue_declare(queue='trade_db',durable=True) channel.queue_declare(queue='trade_stream_service',durable=True) channel.queue_declare(queue='ticker_db',durable=True) channel.queue_declare(queue='ticker_stream_service',durable=True) channel.queue_declare(queue='orderbook_db',durable=True) channel.queue_declare(queue='orderbook_stream_service',durable=True) #Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook') channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook') channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker') channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker') channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade') channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade') 
+12
Jun 27 '14 at 5:37
source share



All Articles