Reading from MSMQ slows down when there are a lot of messages in the queue

Brief introduction

I have a SEDA-based system and use MSMQ to communicate (trigger events) between different applications / services.

One of these services receives messages by file, so I have a file listener that reads the contents of the file and inserts it into the queue (or actually 4 different queues, but this is not very important for the first question).

Server is Windows Server 2008

First question - reading slows down

My application, which reads these messages on the other hand, usually reads about 20 messages from the queue per second, but when the service that sends the messages starts several thousand messages, the reading goes down and the reader only reads 2-4 messages per second. When there are no messages in the queue, the reader can again read up to 20 messages per second.

The code in the reading application is quite simple, developed in C #, I use the Read timeout function (TimeSpan) in System.Messaging.

Q: Why does reading slow down when many messages are sent to the queue?

Second Question - TPS Limitations

An additional question concerns the reading itself. It seems there is no difference in how many messages I can read per second if I use 1 or 5 threads to read from the queue. I also tried to implement a “round-robin solution” in which the mail service sends to an arbitrary set of 4 queues, and in the reader application, one thread listens to each of these queues, but still only 20 TPS remain, even if I read from 1 queues with 1 thread, 1 queue with 4 threads or 4 queues (with one thread in the queue).

I know that processing in a stream takes about 50 ms, so 20 TPS are quite correct if only one message is being processed at that time, but the key with a multi-threaded stream should consist in the fact that messages are processed in parallel, and not sequentially.

There are about 110 different queues on the server.

Q: Why can't I get more than 20 messages from my queue at that time, even with multithreading and using multiple queues?

This code works today:

// There are 4 BackgroundWorkers running this function void bw_DoWork(object sender, DoWorkEventArgs e) { using(var mq = new MessageQueue(".\\content")) { mq.Formatter = new BinaryMessageFormatter(); // ShouldIRun is a bool set to false by OnStop() while(ShouldIRun) { try { using(var msg = mq.Receive(new TimeSpan(0,0,2)) { ProcessMessageBody(msg.Body); // This takes 50 ms to complete } } catch(MessageQueueException mqe) { // This occurs every time TimeSpan in Receive() is reached if(mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) continue; } } } 

But even if there are 4 threads, everyone seems to expect the function to go back to the “Receive” point. I also tried using 4 different queues (content1, content2, content3 and content4), but still I get 1 message processed every 50 ms.

Does this have anything to do with TimeSpan in Receive () and / or can it be omitted?

Another question: if using private queues, does instad public solve anything?

+8
c # message-queue msmq
source share
2 answers

Performance issues.
You do not mention if all the code is running on the server, or if you have clients that remotely access the queues on the server. With speed, I will take up the latter.
Also, are transactions transactions?
How big are the posts?

If you want to read a message from a queue, your application does not connect to the queue itself. Everything goes between the queue manager and the remote queue manager. The queue manager is the only process that writes and reads from queues. Therefore, the presence of several queues or one queue will not necessarily be performed otherwise.

The MSMQ queue manager will thus be a bottleneck at some point, since only so much work can work at the same time. Your first question shows this - when you put a heavy load on the queue manager by inserting IN messages, your ability to receive OUT messages slows down. I would recommend looking at the performance monitor to see if, for example, MQSVC.EXE.

+4
source share

Why are you using a time interval? “That's bad, and that's why.”

When developing services and queues, you need to program them in theadsafe mode. Each item in the queue will spawn a new thread. Using timespan forces each thread to use one timer event stream. These events are forced to wait in line for the stream of events.

Norm is 1 thread for events in the queue. This is usually a System.Messaging.ReceiveCompletedEventArgs event. Another thread is the onStart event ...

20 streams or 20 views per second are probably true. Generally, when combining threads, you can only create 36 threads at a time in .net.

My advice is to drop the timer event so that your queue just processes the data.

do something more like this:

 namespace MessageService { public partial class MessageService : ServiceBase { public MessageService() { InitializeComponent(); } private string MessageDirectory = ConfigurationManager.AppSettings["MessageDirectory"]; private string MessageQueue = ConfigurationManager.AppSettings["MessageQueue"]; private System.Messaging.MessageQueue messageQueue = null; private ManualResetEvent manualResetEvent = new ManualResetEvent(true); protected override void OnStart(string[] args) { // Create directories if needed if (!System.IO.Directory.Exists(MessageDirectory)) System.IO.Directory.CreateDirectory(MessageDirectory); // Create new message queue instance messageQueue = new System.Messaging.MessageQueue(MessageQueue); try { // Set formatter to allow ASCII text messageQueue.Formatter = new System.Messaging.ActiveXMessageFormatter(); // Assign event handler when message is received messageQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler(messageQueue_ReceiveCompleted); // Start listening messageQueue.BeginReceive(); } catch (Exception e) { } } protected override void OnStop() { //Make process synchronous before closing the queue manualResetEvent.WaitOne(); // Clean up if (this.messageQueue != null) { this.messageQueue.Close(); this.messageQueue = null; } } public void messageQueue_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e) { manualResetEvent.Reset(); System.Messaging.Message completeMessage = null; System.IO.FileStream fileStream = null; System.IO.StreamWriter streamWriter = null; string fileName = null; byte[] bytes = new byte[2500000]; string xmlstr = string.Empty; try { // Receive the message completeMessage = this.messageQueue.EndReceive(e.AsyncResult); completeMessage.BodyStream.Read(bytes, 0, bytes.Length); System.Text.ASCIIEncoding ascii = new System.Text.ASCIIEncoding(); long len = completeMessage.BodyStream.Length; int intlen = Convert.ToInt32(len); xmlstr = ascii.GetString(bytes, 0, intlen); } catch (Exception ex0) { //Error converting message to string } } 
+2
source share

All Articles