Spread MassTransit and events and commands

I am new to MassTransit and I have missed something in my understanding.

Say I have a server farm, all nodes can do the same job. The structure of the application is CQRS. This means that I have two main types of posts to post:

  • Commands: you need to process exactly one of the servers, any of them (the first with a free slot)
  • Events: must be handled by all servers

I have a very simple MassTransit prototype (a console application that sends greetings to all X seconds).

In the API, I see that there is a publishing method. How can I indicate what this message is (one against the whole server)?

If I look at the "handler" configuration, I can specify the uri queue. If I specify the same queue for all hosts, all nodes will receive a message, but I cannot limit execution to only one server.

If I listen from a dedicated host queue, only one server will process messages, but I do not know how to send other messages.

Please help me understand what I am missing.

PS: if it excites me, my messaging system is rabbitmq.

To check, I created a common class library with these classes:

public static class ActualProgram { private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource(); private static readonly Random g_Random = new Random(); public static void ActualMain(int delay, int instanceName) { Thread.Sleep(delay); SetupBus(instanceName); Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); Console.WriteLine("Press enter at any time to exit"); Console.ReadLine(); g_Shutdown.Cancel(); Bus.Shutdown(); } private static void PublishRandomMessage() { Bus.Instance.Publish(new Message { Id = g_Random.Next(), Body = "Some message", Sender = Assembly.GetEntryAssembly().GetName().Name }); if (!g_Shutdown.IsCancellationRequested) { Thread.Sleep(g_Random.Next(500, 10000)); Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); } } private static void SetupBus(int instanceName) { Bus.Initialize(sbc => { sbc.UseRabbitMqRouting(); sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName); sbc.Subscribe(subs => { subs.Handler<Message>(MessageHandled); }); }); } private static void MessageHandled(Message msg) { ConsoleColor color = ConsoleColor.Red; switch (msg.Sender) { case "test_app1": color = ConsoleColor.Green; break; case "test_app2": color = ConsoleColor.Blue; break; case "test_app3": color = ConsoleColor.Yellow; break; } Console.ForegroundColor = color; Console.WriteLine(msg.ToString()); Console.ResetColor(); } private static void MessageConsumed(Message msg) { Console.WriteLine(msg.ToString()); } } public class Message { public long Id { get; set; } public string Sender { get; set; } public string Body { get; set; } public override string ToString() { return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body); } } 

I also have 3 console applications that just run the ActualMain method:

 internal class Program { private static void Main(string[] args) { ActualProgram.ActualMain(0, 1); } } 
+8
masstransit
source share
1 answer

What you want is called “Competing Consumers” (SO search for more information) Using RabbitMQ makes life easier, all you have to do is specify the same queue name for each user you run, the message will be processed only one of them. Instead of generating a unique queue every time you do.

  private static void SetupBus(int instanceName) { Bus.Initialize(sbc => { sbc.UseRabbitMqRouting(); sbc.ReceiveFrom("rabbitmq://localhost/Commands); sbc.Subscribe(subs => { subs.Handler<Message>(MessageHandled); }); }); } 

AFAIK, you will need a separate process for command handlers, not event handlers. All command handlers will receive from the same queue, all event handlers will receive their own unique queue.

Another piece of the puzzle is how you get messages on the bus. You can still use the publication for teams, but if you configured consumers incorrectly, you can get several executions, since the message will be sent to all users, if you want the message to end in one queue, you can use "Send" instead of " Publish. "

  Bus.Instance .GetEndpoint(new Uri("rabbitmq://localhost/Commands")) .Send(new Message { Id = g_Random.Next(), Body = "Some message", Sender = Assembly.GetEntryAssembly().GetName().Name }); 
+9
source share

All Articles