Wishes for the film

I implement a simple thread pool mechanism for my ubuntu server (for my anonymous chat program with multiple clients), and I need my workflows to sleep until the job is completed (in the form of a function pointer and parameter) to execute .

My current system exits from the window. I (the workflow) ask the manager if there is work, and if there is no sleep for 5 ms. If there is, add the task to the work queue and execute the function. A miserable waste of cycles.

What I would like to do is create a simple event system. I am thinking about having a mutex vector (one for each worker) and have a mutex handle passed as a parameter when creating it. Then in my manager class (which holds and dismisses tasks), whenever a thread is created, block the mutex. When you need to complete a task, unlock the next mutex in the queue, wait for it to lock and unlock, and restart it. However, I wonder if there is a much better remedy in this regard.


TL; DR; So my question is this. What is the most efficient, efficient, and safest way to get a thread to wait for a job from a control class? Is polling a technique that I have to consider (more than 1000 clients at a time), is blocking mutexes decent? Or are there other methods?

+4
c ++ linux pthreads mutex sockets
source share
5 answers

You need a condition variable.
All worker threads call wait (), which pauses them.

Then the parent thread queues the work item and calls the signal in the condition variable. This will awaken one thread that is sleeping. He can remove the task from the queue, complete the task, and then call wait on the condition variable to return to sleep mode.

Try:

#include <pthread.h> #include <memory> #include <list> // Use RAII to do the lock/unlock struct MutexLock { MutexLock(pthread_mutex_t& m) : mutex(m) { pthread_mutex_lock(&mutex); } ~MutexLock() { pthread_mutex_unlock(&mutex); } private: pthread_mutex_t& mutex; }; // The base class of all work we want to do. struct Job { virtual void doWork() = 0; }; // pthreads is a C library the call back must be a C function. extern "C" void* threadPoolThreadStart(void*); // The very basre minimal part of a thread pool // It does not create the workers. You need to create the work threads // then make them call workerStart(). I leave that as an exercise for you. class ThreadPool { public: ThreadPool(unsigned int threadCount=1); ~ThreadPool(); void addWork(std::auto_ptr<Job> job); private: friend void* threadPoolThreadStart(void*); void workerStart(); std::auto_ptr<Job> getJob(); bool finished; // Threads will re-wait while this is true. pthread_mutex_t mutex; // A lock so that we can sequence accesses. pthread_cond_t cond; // The condition variable that is used to hold worker threads. std::list<Job*> workQueue; // A queue of jobs. std::vector<pthread_t>threads; }; // Create the thread pool ThreadPool::ThreadPool(int unsigned threadCount) : finished(false) , threads(threadCount) { // If we fail creating either pthread object than throw a fit. if (pthread_mutex_init(&mutex, NULL) != 0) { throw int(1); } if (pthread_cond_init(&cond, NULL) != 0) { pthread_mutex_destroy(&mutex); throw int(2); } for(unsigned int loop=0; loop < threadCount;++loop) { if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0) { // One thread failed: clean up for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill) { pthread_kill(threads[kill], 9); } throw int(3); } } } // Cleanup any left overs. // Note. This does not deal with worker threads. // You need to add a method to flush all worker threads // out of this pobject before you let the destructor destroy it. ThreadPool::~ThreadPool() { finished = true; for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) { // Send enough signals to free all threads. pthread_cond_signal(&cond); } for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) { // Wait for all threads to exit (they will as finished is true and // we sent enough signals to make sure // they are running). void* result; pthread_join(*loop, &result); } // Destroy the pthread objects. pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); // Delete all re-maining jobs. // Notice how we took ownership of the jobs. for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop) { delete *loop; } } // Add a new job to the queue // Signal the condition variable. This will flush a waiting worker // otherwise the job will wait for a worker to finish processing its current job. void ThreadPool::addWork(std::auto_ptr<Job> job) { MutexLock lock(mutex); workQueue.push_back(job.release()); pthread_cond_signal(&cond); } // Start a thread. // Make sure no exceptions escape as that is bad. void* threadPoolThreadStart(void* data) { ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart); try { pool->workerStart(); } catch(...){} return NULL; } // This is the main worker loop. void ThreadPool::workerStart() { while(!finished) { std::auto_ptr<Job> job = getJob(); if (job.get() != NULL) { job->doWork(); } } } // The workers come here to get a job. // If there are non in the queue they are suspended waiting on cond // until a new job is added above. std::auto_ptr<Job> ThreadPool::getJob() { MutexLock lock(mutex); while((workQueue.empty()) && (!finished)) { pthread_cond_wait(&cond, &mutex); // The wait releases the mutex lock and suspends the thread (until a signal). // When a thread wakes up it is help until it can acquire the mutex so when we // get here the mutex is again locked. // // Note: You must use while() here. This is because of the situation. // Two workers: Worker A processing job A. // Worker B suspended on condition variable. // Parent adds a new job and calls signal. // This wakes up thread B. But it is possible for Worker A to finish its // work and lock the mutex before the Worker B is released from the above call. // // If that happens then Worker A will see that the queue is not empty // and grab the work item in the queue and start processing. Worker B will // then lock the mutext and proceed here. If the above is not a while then // it would try and remove an item from an empty queue. With a while it sees // that the queue is empty and re-suspends on the condition variable above. } std::auto_ptr<Job> result; if (!finished) { result.reset(workQueue.front()); workQueue.pop_front(); } return result; } 
+6
source share

A common implementation method is to have a queue outstanding work, a mutex mutex protecting the queue, and a queue_not_empty wait queue_not_empty . Then each worker thread does the following (using pseudo-api):

 while (true) { Work * work = 0; mutex.lock(); while ( queue.empty() ) if ( !queue_not_empty.wait( &mutex, timeout ) ) return; // timeout - exit the worker thread work = queue.front(); queue.pop_front(); mutex.unlock(); work->perform(); } 

The wait( &mutex, timeout ) lock wait( &mutex, timeout ) blocked until a wait condition or call waiting time is signaled. The mutex atomically unblocked inside wait() and is again blocked before returning from the call to ensure consistent queue presentation to all participants. timeout will be chosen large enough (in seconds) and will lead to the exit of the stream (the pool of threads will start new if more work comes).

In the meantime, the thread pool insert function performs the following:

 Work * work = ...; mutex.lock(); queue.push_back( work ); if ( worker.empty() ) start_a_new_worker(); queue_not_empty.wake_one(); mutex.unlock(); 
+3
source share

Classical synchronization between producer and consumer with multiple consumers (workflows consume work requests). A well-known method is to have a semaphore, each worker thread executes down() and each time you have a job request, do up() . Then select the query with the mutex. Since one up() will wake only one down() , there will be minimal disclosure on the mutex.

Alternatively, you can do the same with a conditional variable, waiting in each thread and waking up when you have work. The queue itself is still locked with the mutex (condvar still requires).

I’m not quite sure lately, but actually I think that you can use the channel as a queue, including all synchronization (workflows just try to β€œread (sizeof (request)”). A bit hacked, but leads to less context switches.

+2
source share

Since the network chat program is supposedly related to I / O binding, and not processor binding, you really don't need threads. You can handle all of your I / O operations in a single thread using a tool such as Boost.Asio or GLib main loop . These are portable abstractions for platform-specific functions that allow the program to block waiting for activity on any (potentially large) set of open files or sockets, and then wake up and respond quickly when an action occurs.

+2
source share

The easiest way to do this is semaphores . Here's how the semaphore works:

A semaphore is basically a variable that takes zero / positive values. Processes can interact with it in two ways: increase or decrease the semaphore.

Increasing the semaphore adds 1 to this magic variable , and more about that. This reduces the number of interesting things : if the counter reaches zero, and the process tries to lower it again, since it cannot accept negative values, it will be blocked until the variable increases .

If several process blocks expect a decrease in the semaphore value, only one wakes up for each unit, the count increases.

This greatly simplifies the creation of a work / target system: your manager processes tasks and increases the value of the semaphore in accordance with other elements, and your work processes try to reduce the score and constantly get the task. When tasks are not available, they block and do not consume CPU time. When one appears, only one of the inactive processes will wake up. The magic of Insta-sync.

Unfortunately, at least in the Unix world, the semaphore API is not very friendly, because for some reason it considers sempahores arrays rather than separate ones. But you are a simple shell from a nice interface!

Hooray!

+1
source share

All Articles