Masstransit Consumer Death - Peek ()

I'm curious if MassTransit Peek () consumers will be able to queue the MSMQ before actually retrieving the messages.

What steps / process:

1) Msg sent to the queue

2) Consumers receive it and must do a database update - it takes about 5 seconds

3) The consumer must make a second round of updates if the first worked.

My problem is how can I handle the case if during the first update of the database the message remains in the queue (i.e. the problem is with the network and cannot get to db).

Currently, as soon as he reads the message from the queue, he deletes it, and then it just disappears if the database updates do not work.

In addition, how can I deal with a power outage - I mean, if halfway through the β€œtask” the consumer has, whatever it is (db update or something else), and the power dies, etc., how can i restart the process on msg in queue? Let's say that the job (in my current instance anyway) causes a new row in the table. I mean, I can write code to first check if a line exists, and if it then discards the message, and if not, it starts the task, but how can I get it to restart the whole process in the first place?

I read that the Peek() queue could, and then run the task, and then read the msg message queue for the real one and delete it, but I can’t figure out for life whether this works with mass transit. bit is lost ...

In addition, I know that Masstransit has a .RetryLater , but then I use it in this process? Is it Initially β†’ When β†’ Then β†’ .RetryLater in the saga?

Any pointers will be involved

Regards Robin

EDIT

PS: I use the saga ....

 Define(() => { RemoveWhen(saga => saga.CurrentState == Completed); Initially( When(NewAC) .Then((saga, message) => saga.ProcessPSM(message), InCaseOf<Exception>() .TransitionTo(Problem) ) .Then((saga, message) => saga.PostProcessPSM()) .Complete() ); During(Problem, When(Waiting) // NOTE: THIS DOES NOT WORK!!!! .RetryLater() ); }); 

RetryLater throws an error: "Message cannot be accepted by an existing saga"

I'm not sure how else I can access RetryLater.

+4
source share
1 answer

MassTransit abstracts the concepts of basic queues. Thus, Peek is not a solution, but it has other means for repeating messages. If you are simply interested in handling errors and crashes, the following mechanisms are sufficient.

By default, if the user throws an exception, the message will be repeated N times:

  • where N is configured on the bus and defaults to 5. It can be changed to initialize the bus using SetDefaultRetryLimit on ServiceBusConfigurator
  • If reuse means that the message will be added to the end of the queue

If you need a more subtle approach to error handling, you can implement the Consumer Context, catch catchable or temporary exceptions and manually call RetryLater. There is no limit to how many times this can be done when I understand this.

 public class RetryConsumer : Consumes<AwesomeMessage>.Context { public void Consume(IConsumeContext<AwesomeMessage> message) { try { Console.WriteLine("This is Attempt " + message.RetryCount); // Do Something } catch (SomeTransientException e) { message.RetryLater(); } } } 
+7
source

All Articles