RabbitMQ suspends queue consumption

What is the best way to maintain a solid queue and bindings, but to suspend its users?

Use case: I would like to β€œlet it crash” and stop processing messages if we continue to receive a bunch of messages that we cannot deal with (for example, the database does not work or a problem with the schema), but I would like to keep the aggregation in the queue . This allows you to publish, but pause consumption.

I can present three solutions:

  • I could have all the consumers associated with the queue continuously reject messages and reorder, but these are kind of unnecessary resources, not to mention the fact that I programmatically execute the above logic.
  • I could call basic.cancelConsumer for all consumers (see below)
  • Or in terms of I assume that I could call shutdown on all SimpleMessageListenerContainers that are queued.

#1 we already do, as messages are rejected. The problem is that it ends up like an endless loop cycle, and if your log does not allow you to get more resources wasted.

#3 seems perfect, but I need to know how they know about all message listeners, and then notify them of the disconnect. I suppose I could use a branching exchange to inform that the queue should be suspended. I feel that RabbitMQ should have something built-in for this logic. Another problem is that you can associate multiple queues with a message container (not all queues can be suspended).

For #2 I know that to cancel the user with his consumerTag , but the question (assuming this is the right way to do the above), where can I get the list of consumerTag in the queue?

+6
source share
2 answers

If you need to stop the consumer, then just call the basic cancellation to do so.

Having strong queues is the issue you decide when declaring a queue: durable=true auto_delete=false .

The presence of persistent messages is determined when they are published: delivery_mode=2.

Each consumer has his own consumer tag. Whenever you receive a message from the queue, the envelope must have a consumer tag, and you must trigger a basic cancellation using this tag.

AFAIK, you cannot have user A in conn 1 and cause basic cancellation for this consumer in another connection. Maybe I'm wrong.

+1
source

This is . Its basically #3 from my answer.

I support a service that has a Map<String,CustomSimpleMessageListenerContainer> queue name for custom extended SimpleMessageListenerContainer .

After a number of exceptions, I send a β€œpanic” message, which is sent to a special queue received by the service to shut down the user.

+1
source

All Articles