Is there a design pattern for the amqp architecture for handling commit / rollback messages?

I have a simple producer / consumer amqp created as follows:

producer -> e1:jobs_queue -> consumer -> e2:results_queue -> result_handler 

The manufacturer sends a number of jobs. The consumer pulls tasks one at a time and processes them, pushing the result into another queue. They are then pulled by the result_handler, which publishes the results to the database.

Sometimes a consumer fails - it can be killed by the operating system or an exception is thrown. If this happens during message processing, this message is lost, no corresponding result is obtained, and I am sad. I would be happy again if the failed work was regrouped.

What I'm looking for is a design pattern to ensure that any consumer processes the task before completion and puts the corresponding result in * results_queue *, or if it does not work, then the task returns to * jobs_queue *. Since the consumer is what fails, the consumer should not be responsible for managing any messages related to his own observation.

We know that the consumer could not process the task if:

  • he took the job from * job_queue *, and after some timeout the result was not received
  • he took a job from * job_queue * and then died

For my application, we can probably capture the second case, just waiting for the job to process before the timeout. There will be many workers in production who will control, all pull out tasks from the general list of tasks and display the results in a single exchange of results / queue.

+4
source share
1 answer

The easiest way to achieve what you want is to manually process confirmations for received messages. In node-amqp this is as simple as adding the { ack: true } option to a queue.subscribe call. You can then confirm the messages by calling some functions in the queue. In the case of node-amqp this is queue.shift() .

You can also set the number of messages not yet acknowledged by the consumer using prefetchCount .

Any unconfirmed messages will now be re-added (to any connected consumers) if the consumer disconnects.

Also, by setting the queue to durable and autoDelete: false , you can additionally make sure that the queue (and messages on it) will not be deleted when you restart your MQ server or disconnect the last user.

+2
source

All Articles