Is there an easy way to subscribe to the default error queue in EasyNetQ?

In my test application, I can see the messages that were processed, and the exception is automatically inserted into the default EasyNetQ_Default_Error_Queue, which is excellent. I can then successfully dump or request these messages using Hosepipe , which also works fine, but requires dropping to the command line and calling both the Hosepipe and the RabbitMQ API to clear the message retry queue.

So, I think the easiest approach for my application is to simply subscribe to the error queue, so I can reprogram them using the same infrastructure. But at EastNetQ, the error queue seems special. We need to subscribe using the correct type and routing identifier, so I'm not sure what these values ​​should be for the error queue:

bus.Subscribe<WhatShouldThisBe>("and-this", ReprocessErrorMessage);

Can I use a simple API to subscribe to the error queue or do I need to dig into the advanced API ?

If the type of my original message was TestMessage , then I would like to do something like this:

bus.Subscribe<ErrorMessage<TestMessage>>("???", ReprocessErrorMessage);

where ErrorMessage is the class provided by EasyNetQ to wrap all errors. Is it possible?

+6
source share
2 answers

You cannot use a simple API to subscribe to the error queue, because it does not conform to naming conventions like the EasyNetQ queue — maybe something needs to be fixed;)

But Advanced API works fine. You will not receive the original message, but it is easy to get a JSON representation that you could de-serialize yourself quite easily (using Newtonsoft.JSON). Here is an example of what your subscription code should look like:

 [Test] [Explicit("Requires a RabbitMQ server on localhost")] public void Should_be_able_to_subscribe_to_error_messages() { var errorQueueName = new Conventions().ErrorQueueNamingConvention(); var queue = Queue.DeclareDurable(errorQueueName); var autoResetEvent = new AutoResetEvent(false); bus.Advanced.Subscribe<SystemMessages.Error>(queue, (message, info) => { var error = message.Body; Console.Out.WriteLine("error.DateTime = {0}", error.DateTime); Console.Out.WriteLine("error.Exception = {0}", error.Exception); Console.Out.WriteLine("error.Message = {0}", error.Message); Console.Out.WriteLine("error.RoutingKey = {0}", error.RoutingKey); autoResetEvent.Set(); return Task.Factory.StartNew(() => { }); }); autoResetEvent.WaitOne(1000); } 

I had to fix a small error in the error code input code in EasyNetQ before it worked, so before testing it, please get version> = 0.9.2.73. You can see the sample code here

+3
source

Code that works: (I assumed)

The hole with "foo" is due to the fact that if I simply pass this HandleErrorMessage2 function to a Consume call, it will not be able to understand that it returns a void, not a task, therefore it cannot determine which overload to use, (VS 2012) Purpose var makes him happy. You will want to catch the return value of the call in order to be able to unsubscribe by deleting the object.

Also note that someone used the system object name (Queue) instead of making it EasyNetQueue or something like that, so you need to add an explanation for the compiler or specify it completely.

  using Queue = EasyNetQ.Topology.Queue; private const string QueueName = "EasyNetQ_Default_Error_Queue"; public static void Should_be_able_to_subscribe_to_error_messages(IBus bus) { Action <IMessage<Error>, MessageReceivedInfo> foo = HandleErrorMessage2; IQueue queue = new Queue(QueueName,false); bus.Advanced.Consume<Error>(queue, foo); } private static void HandleErrorMessage2(IMessage<Error> msg, MessageReceivedInfo info) { } 
0
source

All Articles