I am new to MassTransit and I have missed something in my understanding.
Say I have a server farm, all nodes can do the same job. The structure of the application is CQRS. This means that I have two main types of posts to post:
- Commands: you need to process exactly one of the servers, any of them (the first with a free slot)
- Events: must be handled by all servers
I have a very simple MassTransit prototype (a console application that sends greetings to all X seconds).
In the API, I see that there is a publishing method. How can I indicate what this message is (one against the whole server)?
If I look at the "handler" configuration, I can specify the uri queue. If I specify the same queue for all hosts, all nodes will receive a message, but I cannot limit execution to only one server.
If I listen from a dedicated host queue, only one server will process messages, but I do not know how to send other messages.
Please help me understand what I am missing.
PS: if it excites me, my messaging system is rabbitmq.
To check, I created a common class library with these classes:
public static class ActualProgram { private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource(); private static readonly Random g_Random = new Random(); public static void ActualMain(int delay, int instanceName) { Thread.Sleep(delay); SetupBus(instanceName); Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); Console.WriteLine("Press enter at any time to exit"); Console.ReadLine(); g_Shutdown.Cancel(); Bus.Shutdown(); } private static void PublishRandomMessage() { Bus.Instance.Publish(new Message { Id = g_Random.Next(), Body = "Some message", Sender = Assembly.GetEntryAssembly().GetName().Name }); if (!g_Shutdown.IsCancellationRequested) { Thread.Sleep(g_Random.Next(500, 10000)); Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); } } private static void SetupBus(int instanceName) { Bus.Initialize(sbc => { sbc.UseRabbitMqRouting(); sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName); sbc.Subscribe(subs => { subs.Handler<Message>(MessageHandled); }); }); } private static void MessageHandled(Message msg) { ConsoleColor color = ConsoleColor.Red; switch (msg.Sender) { case "test_app1": color = ConsoleColor.Green; break; case "test_app2": color = ConsoleColor.Blue; break; case "test_app3": color = ConsoleColor.Yellow; break; } Console.ForegroundColor = color; Console.WriteLine(msg.ToString()); Console.ResetColor(); } private static void MessageConsumed(Message msg) { Console.WriteLine(msg.ToString()); } } public class Message { public long Id { get; set; } public string Sender { get; set; } public string Body { get; set; } public override string ToString() { return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body); } }
I also have 3 console applications that just run the ActualMain method:
internal class Program { private static void Main(string[] args) { ActualProgram.ActualMain(0, 1); } }