How to create a personalized message pump?

I do a little exercise when I need to create something that looks like a message pump. I have a work queue, and I want the work to be performed entirely on one thread, while any thread can add work to the queue.

Queue<WorkToDo> queue; 

The threads use a standby knob to tell the pump that there is work.

 WaitHandle signal; 

The pump simply loops until there is work, and then waits until the signal starts again.

 while(ApplicationIsRunning){ while(queue.HasWork){ DoWork(queue.NextWorkItem) } signal.Reset(); signal.WaitOne(); } 

Each other thread can add work to the queue and signal a wait descriptor ...

 public void AddWork(WorkToDo work){ queue.Add(work); signal.Set(); } 

The problem is that if the work is added quickly enough, a condition may arise when the work can be left in the queue, because between checking the queue for work and WaitHandle reset another thread can add work to the queue.

How can I mitigate this without putting an expensive mutex around WaitHandle?

+4
source share
3 answers

You can use BlockingCollection<T> to make the implementation of the queue much easier, since it will handle the synchronization for you:

 public class MessagePump { private BlockingCollection<Action> actions = new BlockingCollection<Action>(); public void Run() //you may want to restrict this so that only one caller from one thread is running messages { foreach (var action in actions.GetConsumingEnumerable()) action(); } public void AddWork(Action action) { actions.Add(action); } public void Stop() { actions.CompleteAdding(); } } 
+3
source

You do not need to do a full mutex, but can you put a lock statement

 public void AddWork(WorkToDo work) { queue.Add(work); lock(lockingObject) { signal.Set(); } } 

use whatever you want for the lock object, most people will say that using the signal itself is a bad idea.

Responding to a comment @ 500 - Internal server error below, you can simply reset to transmit a signal before doing the job. The following should protect things:

 while(ApplicationIsRunning) { while(queue.HasWork) { WorkItem wi; lock(lockingObject) { wi = queue.NextWorkItem; if(!queue.HasWork) { signal.Reset(); } } DoWork(wi) } signal.WaitOne(); } 

Thus, if you have more work, the internal queue continues. If not, then it drops to signal.WaitOne() , and we only reset if there is no more work in the queue.

The only drawback here is that we can reset several times in a row if work comes in when DoWork is DoWork .

+2
source

You can use the WaitOne (TimeSpan) method so that you have a hybrid signal / polling cycle. Basically specify a duration of 1 second. Expectations no more. This will cause the task that falls into this race to be held for no more than a second (or no matter what time you indicate) or until another task is added to your turn.

0
source

All Articles