Multithreaded RabbitMQ Consumer

We have a Windows service that listens for a single RabbitMQ queue and processes the message.

We would like to extend the same Windows services so that it can listen to several RabbitMQ queues and process the message.

Not sure if multithreading is possible, since each thread will have to display (block) the queue.

Since I am very new to multithreading, I need a high level guide on the next point, which will help me get started creating a prototype.

  • Is it possible to listen to multiple queues in one application using streaming?
  • How to cope with a situation when a thread (due to an exception, etc.), how to return whole windows without restarting.
  • Any design template or open source implementation that can help me deal with this situation.
+6
source share
2 answers

I like the way you wrote your question - it began very broadly and focused on specifics. I have successfully implemented something very similar, and I'm currently working on an open source project to learn lessons and bring them back to the community. Unfortunately, though ... I still have to carefully package the code, which will not help you! In any case, to answer your questions:

1. Is it possible to use threading for multiple queues.

A: Yes, but it can be full of traps. Namely, the RabbitMQ.NET library is not the best written piece of code, and I found that it is a rather cumbersome implementation of the AMQP protocol. One of the most pernicious warnings is how it deals with "accepting" or "consuming" behavior, which can lead to deadlocks if you are not careful. Fortunately, this is well illustrated in the API documentation. Tip . If you can, use a solid connection object. Then, in each thread, use the connection to create a new IModel and associated consumers.

2. How to gracefully handle exceptions in threads - I believe this is another topic, and I will not consider it here, as there are several methods that you can use.

3. Any open-source projects? - I liked to think about EasyNetQ , although I still rode on my own. I hope to remember to follow me when my open source project is completed, as I believe this is even better than EasyNetQ.

+6
source

You may find this answer very helpful. I have a very general idea of ​​how RabbitMQ works, but I would probably continue to use one subscriber per channel in the stream, as suggested.

There is, of course, more than one option for organizing a flow model. The actual implementation will depend on how you need to process messages from several queues: either in parallel, or by combining them, and serialization of processing. The following code is a console application that implements a simulation of the latter case. It uses a parallel task library and the BlockingCollection class (which comes very convenient for this kind of task).

 using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Console_21842880 { class Program { BlockingCollection<object> _commonQueue; // process an individual queue void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token) { while (true) { // observe cancellation token.ThrowIfCancellationRequested(); // get a message, this blocks and waits var message = queue.Take(token); // process this message // just place it to the common queue var wrapperMessage = "queue " + id + ", message: " + message; _commonQueue.Add(wrapperMessage); } } // process the common aggregated queue void ProcessCommonQeueue(CancellationToken token) { while (true) { // observe cancellation token.ThrowIfCancellationRequested(); // this blocks and waits // get a message, this blocks and waits var message = _commonQueue.Take(token); // process this message Console.WriteLine(message.ToString()); } } // run the whole process async Task RunAsync(CancellationToken token) { var queues = new List<BlockingCollection<object>>(); _commonQueue = new BlockingCollection<object>(); // start individual queue processors var tasks = Enumerable.Range(0, 4).Select((i) => { var queue = new BlockingCollection<object>(); queues.Add(queue); return Task.Factory.StartNew( () => ProcessQeueue(i, queue, token), TaskCreationOptions.LongRunning); }).ToList(); // start the common queue processor tasks.Add(Task.Factory.StartNew( () => ProcessCommonQeueue(token), TaskCreationOptions.LongRunning)); // start the simulators tasks.AddRange(Enumerable.Range(0, 4).Select((i) => SimulateMessagesAsync(queues, token))); // wait for all started tasks to complete await Task.WhenAll(tasks); } // simulate a message source async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token) { var random = new Random(Environment.TickCount); while (true) { token.ThrowIfCancellationRequested(); await Task.Delay(random.Next(100, 1000)); var queue = queues[random.Next(0, queues.Count)]; var message = Guid.NewGuid().ToString() + " " + DateTime.Now.ToString(); queue.Add(message); } } // entry point static void Main(string[] args) { Console.WriteLine("Ctrl+C to stop..."); var cts = new CancellationTokenSource(); Console.CancelKeyPress += (s, e) => { // cancel upon Ctrl+C e.Cancel = true; cts.Cancel(); }; try { new Program().RunAsync(cts.Token).Wait(); } catch (Exception ex) { if (ex is AggregateException) ex = ex.InnerException; Console.WriteLine(ex.Message); } Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } } } 

Another idea might be to use Reactive Extensions (Rx) . If you can think of incoming messages as events, Rx can combine them into a single thread.

+5
source

All Articles