Blocking wait conditions when using wait () in producer-consumer code

I implemented a basic in-line producer-consumer (stream 1 = producer, stream 2 = consumer) using Boost flows and conditions. I often linger on wait () indefinitely. I can’t understand what might be wrong here. Below is some pseudo code:

// main class class Main { public: void AddToQueue(...someData...) { boost::mutex::scoped_lock lock(m_mutex); m_queue.push_back(new QueueItem(...someData...)); m_cond.notify_one(); } void RemoveQueuedItem(...someCond...) { // i'm wondering if this could cause the trouble? boost::mutex::scoped_lock lock(m_mutex); // erase a item matching condition (some code not shown, // but should be fairly self-explanatory -- IsMatch() // simply looks at a flag of QueueItem m_queue.erase(std::remove_if(m_queue.being(), m_queue.end(), boost::bind(&Main::IsMatch, this, _1, someCond), m_queue.end()); } friend void WorkerThread(Main* m); private: boost::ptr_deque<QueueItem> m_queue; boost::mutex m_mutex; boost::condition m_cond; }; // worker thread void WorkerThread(Main* m) { typedef boost::ptr_deque<QueueItem>::auto_type RelType; RelType queueItem; while(!shutDown) { { // begin mutex scope boost::mutex::scoped_lock lock(m->m_mutex); while(m->m_queue.empty()) { m->m_cond.wait(lock); // <- stuck here forever quite often! } queueItem = m->m_queue->pop_front(); // pop & take ptr ownership } // end mutex scope // ... do stuff with queueItem // ... // ... queueItem is deleted when it leaves scope & we loop around } } 

Additional Information:

  • Using Boost v1.44
  • The problem occurs in Linux and Android; I'm not sure if this is happening on Windows

Any ideas?

UPDATE : I believe I highlighted the problem. I will once again confirm that I confirm that I hope that it will be tomorrow.

UPDATE 2 : It turns out there is no problem in the code described above. I relied on the basic API for AddToQueue () - when processing data in a workflow and passing it back to the API, it had a circular error, where it again called AddToQueue () ... which is now fixed ;-)

+4
source share
2 answers

I did something similar recently, although mine uses the STL queue. See if you can choose from my implementation. As stated in wilx , you need to wait on condition. My implementation has a maximum limit for items in the queue, and I use this to wait for the mutex / protector to free.

I initially did this on Windows with the option of using Mutex or Critical partitions, so you can specify a template parameter that you can remove and use boost::mutex if that makes it easier for you.

 #include <queue> #include "Message.h" #include <boost/thread/locks.hpp> #include <boost/thread/condition.hpp> template <typename T> class Queue : private boost::noncopyable { public: // constructor binds the condition object to the Q mutex Queue(T & mutex, size_t max_size) : m_max_size(max_size), m_mutex(mutex){} // writes messages to end of Q void put(const Message & msg) { // Lock mutex to ensure exclusive access to Q boost::unique_lock<T> guard(m_mutex); // while Q is full, sleep waiting until something is taken off of it while (m_queue.size() == m_max_size) { cond.wait(guard); } // ok, room on the queue. // Add the message to the queue m_queue.push(msg); // Indicate so data can be ready from Q cond.notify_one(); } // Read message from front of Q. Message is removed from the Q Message get(void) { // Lock mutex to ensure exclusive access to Q boost::unique_lock<T> guard(m_mutex); // If Q is empty, sleep waiting for something to be put onto it while (m_queue.empty()) { cond.wait(guard); } // Q not empty anymore, read the value Message msg = m_queue.front(); // Remove it from the queue m_queue.pop(); // Signal so more data can be added to Q cond.notify_one(); return msg; } size_t max_size(void) const { return m_max_size; } private: const size_t m_max_size; T & m_mutex; std::queue<Message> m_queue; boost::condition_variable_any cond; }; 

Thus, you can exchange the queue between the producer / consumer. Usage example

 boost::mutex mutex; Queue<boost::mutex> q(mutex, 100); boost::thread_group threads; threads.create_thread(Producer<boost::mutex>(q)); threads.create_thread(Consumer<boost::mutex>(q)); threads.join_all(); 

With the manufacturer / consumer indicated below

 template <typename T> class Producer { public: // Queue passed in explicit Producer(Queue<T> &q) : m_queue(q) {} void operator()() { } } 
+2
source
 m->m_cond.wait(); // <- stuck here forever quite often! 

it should be:

 m->m_cond.wait( lock ); 

You are dead, locked up your classes because you are still mutexes, but you have been waiting. Every other method wants to get the same mutex and wait for your employee to never issue mutexes.

0
source

All Articles