C #, when the main thread falls asleep, the whole thread is stopped

I have a class working with the Producer-Consumer model as follows:

public class SyncEvents { public bool waiting; public SyncEvents() { waiting = true; } } public class Producer { private readonly Queue<Delegate> _queue; private SyncEvents _sync; private Object _waitAck; public Producer(Queue<Delegate> q, SyncEvents sync, Object obj) { _queue = q; _sync = sync; _waitAck = obj; } public void ThreadRun() { lock (_sync) { while (true) { Monitor.Wait(_sync, 0); if (_queue.Count > 0) { _sync.waiting = false; } else { _sync.waiting = true; lock (_waitAck) { Monitor.Pulse(_waitAck); } } Monitor.Pulse(_sync); } } } } public class Consumer { private readonly Queue<Delegate> _queue; private SyncEvents _sync; private int count = 0; public Consumer(Queue<Delegate> q, SyncEvents sync) { _queue = q; _sync = sync; } public void ThreadRun() { lock (_sync) { while (true) { while (_queue.Count == 0) { Monitor.Wait(_sync); } Delegate query = _queue.Dequeue(); query.DynamicInvoke(null); count++; Monitor.Pulse(_sync); } } } } /// <summary> /// Act as a consumer to the queries produced by the DataGridViewCustomCell /// </summary> public class QueryThread { private SyncEvents _syncEvents = new SyncEvents(); private Object waitAck = new Object(); private Queue<Delegate> _queryQueue = new Queue<Delegate>(); Producer queryProducer; Consumer queryConsumer; public QueryThread() { queryProducer = new Producer(_queryQueue, _syncEvents, waitAck); queryConsumer = new Consumer(_queryQueue, _syncEvents); Thread producerThread = new Thread(queryProducer.ThreadRun); Thread consumerThread = new Thread(queryConsumer.ThreadRun); producerThread.IsBackground = true; consumerThread.IsBackground = true; producerThread.Start(); consumerThread.Start(); } public bool isQueueEmpty() { return _syncEvents.waiting; } public void wait() { lock (waitAck) { while (_queryQueue.Count > 0) { Monitor.Wait(waitAck); } } } public void Enqueue(Delegate item) { _queryQueue.Enqueue(item); } } 

The code runs smoothly, but the wait () function. In some cases, I want to wait until all the functions in the queue are complete, so I made the wait () function.

The manufacturer will trigger waitAck at the appropriate time.

However, when the line is "Monitor.Wait (waitAck)"; It is launched in the wait () function, all flow stops, including the producer and consumer flows.

Why does this happen and how can I solve it? thanks!

+4
multithreading c #
source share
2 answers

It seems very unlikely that all threads will actually stop, although I should point out that in order to avoid false awakenings, you probably should have a while loop instead of an if statement:

 lock (waitAck) { while(queryProducer.secondQueue.Count > 0) { Monitor.Wait(waitAck); } } 

The fact that you call Monitor.Wait means that waitAck should be released, so it should not block user threads ...

Could you give more information on how producer / consumer flows β€œstop”? Looks like they just hit a dead end?

Is your manufacturer used with Notify or NotifyAll ? Now you have an extra wait thread, so if you use only Notify , it is only going to release one thread ... it's hard to see if there is a problem without the details of your Producer and Consumer classes.

If you can show a short but complete program to demonstrate the problem, this will help.

EDIT: Ok, now you have posted the code, and I see a number of problems:

  • Having a large number of public variables is a recipe for disaster. Your classes must encapsulate their functionality so that no other code needs to be shaken to implement bits and parts. (For example, your calling code here really should not have access to the queue.)

  • You add items directly to the second line, which means that you cannot effectively wake the manufacturer to add them first. Why do you even have several queues?

  • You are always waiting for _sync in the producer thread ... why? Where to start to notify? Generally speaking, a producer thread should not wait unless you have a limited buffer

  • You have a static variable (_waitAck) that is overwritten every time you create a new instance. It is a bad idea.

You also did not show your SyncEvents class - does that mean doing something interesting?

Honestly, it looks like you have a rather strange design - it might well be best for you to start from scratch. Try encapsulating the entire producer / consumer queue into one class that has the Produce and Consume methods, as well as WaitForEmpty (or something like that). I think you will find the synchronization logic a lot easier.

+1
source share

Here is my code:

 public class ProducerConsumer { private ManualResetEvent _ready; private Queue<Delegate> _queue; private Thread _consumerService; private static Object _sync = new Object(); public ProducerConsumer(Queue<Delegate> queue) { lock (_sync) { // Note: I would recommend that you don't even // bother with taking in a queue. You should be able // to just instantiate a new Queue<Delegate>() // and use it when you Enqueue. There is nothing that // you really need to pass into the constructor. _queue = queue; _ready = new ManualResetEvent(false); _consumerService = new Thread(Run); _consumerService.IsBackground = true; _consumerService.Start(); } } public override void Enqueue(Delegate value) { lock (_sync) { _queue.Enqueue(value); _ready.Set(); } } // The consumer blocks until the producer puts something in the queue. private void Run() { Delegate query; try { while (true) { _ready.WaitOne(); lock (_sync) { if (_queue.Count > 0) { query = _queue.Dequeue(); query.DynamicInvoke(null); } else { _ready.Reset(); continue; } } } } catch (ThreadInterruptedException) { _queue.Clear(); return; } } protected override void Dispose(bool disposing) { lock (_sync) { if (_consumerService != null) { _consumerService.Interrupt(); } } base.Dispose(disposing); } } 

I'm not quite sure what you are trying to achieve with the wait function ... I assume that you are trying to set some type of limit on the number of elements that can be queued. In this case, just throw an exception or return a failure signal when you have too many elements in the queue, the client that calls Enqueue will continue to try again until the queue can take more elements. Taking an optimistic approach will save you a lot of headaches, and it just helps you get rid of a lot of complex logic.

If you REALLY want the wait there, I can probably help you find the best approach. Let me know what you are trying to achieve with expectation, and I will help you.

Note. . I took this code from one of my projects, changed it a bit, and posted it here ... there may be some minor syntax errors, but the logic should be correct.

UPDATE:. Based on your comments, I made some changes: I added another ManualResetEvent to the class, so when you call BlockQueue() , you get an event that you can wait and sets a flag to stop the Enqueue function from queuing more elements. When all requests in the queue are served, the flag is set to true, and the _wait event _wait set so that anyone waiting for it receives a signal.

 public class ProducerConsumer { private bool _canEnqueue; private ManualResetEvent _ready; private Queue<Delegate> _queue; private Thread _consumerService; private static Object _sync = new Object(); private static ManualResetEvent _wait = new ManualResetEvent(false); public ProducerConsumer() { lock (_sync) { _queue = new Queue<Delegate> _queue; _canEnqueue = true; _ready = new ManualResetEvent(false); _consumerService = new Thread(Run); _consumerService.IsBackground = true; _consumerService.Start(); } } public bool Enqueue(Delegate value) { lock (_sync) { // Don't allow anybody to enqueue if( _canEnqueue ) { _queue.Enqueue(value); _ready.Set(); return true; } } // Whoever is calling Enqueue should try again later. return false; } // The consumer blocks until the producer puts something in the queue. private void Run() { try { while (true) { // Wait for a query to be enqueued _ready.WaitOne(); // Process the query lock (_sync) { if (_queue.Count > 0) { Delegate query = _queue.Dequeue(); query.DynamicInvoke(null); } else { _canEnqueue = true; _ready.Reset(); _wait.Set(); continue; } } } } catch (ThreadInterruptedException) { _queue.Clear(); return; } } // Block your queue from enqueuing, return null // if the queue is already empty. public ManualResetEvent BlockQueue() { lock(_sync) { if( _queue.Count > 0 ) { _canEnqueue = false; _wait.Reset(); } else { // You need to tell the caller that they can't // block your queue while it empty. The caller // should check if the result is null before calling // WaitOne(). return null; } } return _wait; } protected override void Dispose(bool disposing) { lock (_sync) { if (_consumerService != null) { _consumerService.Interrupt(); // Set wait when you're disposing the queue // so that nobody is left with a lingering wait. _wait.Set(); } } base.Dispose(disposing); } } 
+1
source share

All Articles