Increase the lifetime of threads with synchronization (C ++ 11)

I have a program with a function that takes a pointer like arg and main. The main thing is to create threads n , each of which performs a function in different memory areas, depending on the past arg . Then the threads are connected, the main one performs some mixing of the data between the region and creates n new streams that perform the same operation as the old ones.

To improve the program, I would like to keep the threads alive by deleting the long time it takes to create them. The threads should sleep when the main unit is working and notified when they have to rise again. In the same way, the main one should wait for the threads to work the same way as with join.

I cannot end the strong implementation of this, always getting stuck.

Simple basic code, any hints on how to change it will be highly appreciated

 #include <thread> #include <climits> ... void myfunc(void * p) { do_something(p); } int main(){ void * myp[n_threads] {a_location, another_location,...}; std::thread mythread[n_threads]; for (unsigned long int j=0; j < ULONG_MAX; j++) { for (unsigned int i=0; i < n_threads; i++) { mythread[i] = std::thread(myfunc, myp[i]); } for (unsigned int i=0; i < n_threads; i++) { mythread[i].join(); } mix_data(myp); } return 0; } 
+14
c ++ multithreading thread-safety c ++ 11
Mar 06 '13 at 16:07
source share
4 answers

Here is a possible approach that uses only classes from the C ++ 11 standard library. Basically, each stream that is created has a queue of commands (encapsulated in std::packaged_task<> objects) that it constantly checks. If the queue is empty, the thread will just wait for the condition variable ( std::condition_variable ).

To avoid data leakage using the std::mutex and std::unique_lock<> RAII std::unique_lock<> main thread can wait for the completion of a certain task by saving the std::future<> object associated with each std::packaged_tast<> sent and calling wait() on it.

The following is a simple program that follows this construction. Comments should be sufficient to explain what he is doing:

 #include <thread> #include <iostream> #include <sstream> #include <future> #include <queue> #include <condition_variable> #include <mutex> // Convenience type definition using job = std::packaged_task<void()>; // Some data associated to each thread. struct thread_data { int id; // Could use thread::id, but this is filled before the thread is started std::thread t; // The thread object std::queue<job> jobs; // The job queue std::condition_variable cv; // The condition variable to wait for threads std::mutex m; // Mutex used for avoiding data races bool stop = false; // When set, this flag tells the thread that it should exit }; // The thread function executed by each thread void thread_func(thread_data* pData) { std::unique_lock<std::mutex> l(pData->m, std::defer_lock); while (true) { l.lock(); // Wait until the queue won't be empty or stop is signaled pData->cv.wait(l, [pData] () { return (pData->stop || !pData->jobs.empty()); }); // Stop was signaled, let exit the thread if (pData->stop) { return; } // Pop one task from the queue... job j = std::move(pData->jobs.front()); pData->jobs.pop(); l.unlock(); // Execute the task! j(); } } // Function that creates a simple task job create_task(int id, int jobNumber) { job j([id, jobNumber] () { std::stringstream s; s << "Hello " << id << "." << jobNumber << std::endl; std::cout << s.str(); }); return j; } int main() { const int numThreads = 4; const int numJobsPerThread = 10; std::vector<std::future<void>> futures; // Create all the threads (will be waiting for jobs) thread_data threads[numThreads]; int tdi = 0; for (auto& td : threads) { td.id = tdi++; td.t = std::thread(thread_func, &td); } //================================================= // Start assigning jobs to each thread... for (auto& td : threads) { for (int i = 0; i < numJobsPerThread; i++) { job j = create_task(td.id, i); futures.push_back(j.get_future()); std::unique_lock<std::mutex> l(td.m); td.jobs.push(std::move(j)); } // Notify the thread that there is work do to... td.cv.notify_one(); } // Wait for all the tasks to be completed... for (auto& f : futures) { f.wait(); } futures.clear(); //================================================= // Here the main thread does something... std::cin.get(); // ...done! //================================================= //================================================= // Posts some new tasks... for (auto& td : threads) { for (int i = 0; i < numJobsPerThread; i++) { job j = create_task(td.id, i); futures.push_back(j.get_future()); std::unique_lock<std::mutex> l(td.m); td.jobs.push(std::move(j)); } // Notify the thread that there is work do to... td.cv.notify_one(); } // Wait for all the tasks to be completed... for (auto& f : futures) { f.wait(); } futures.clear(); // Send stop signal to all threads and join them... for (auto& td : threads) { std::unique_lock<std::mutex> l(td.m); td.stop = true; td.cv.notify_one(); } // Join all the threads for (auto& td : threads) { td.t.join(); } } 
+15
Mar 06 '13 at
source share

The concept you want is threadpool. This SO> question is about existing implementations.

The idea is to have a container for multiple thread instances. Each instance is associated with a function that checks the task queue, and when a task is available, pulls it out and starts it. When a task completes (if it completes, but this is another problem), the thread simply gets into the task queue.

So, you need a synchronized queue, a thread class that implements a loop in a queue, an interface for task objects, and possibly a class to manage everything (pool class).

Alternatively, you can create a very specialized thread class for the task that it needs to perform (for example, only a memory area as a parameter). This requires a notification mechanism for threads to indicate that they are running at the current iteration.

The main function of the stream will be a cycle for this particular task, and at the end of one iteration, the stream signals its end and expects variable conditions to start the next cycle. Essentially, you would have to insert the task code into the stream, completely eliminating the need for a queue.

  using namespace std; // semaphore class based on C++11 features class semaphore { private: mutex mMutex; condition_variable v; int mV; public: semaphore(int v): mV(v){} void signal(int count=1){ unique_lock lock(mMutex); mV+=count; if (mV > 0) mCond.notify_all(); } void wait(int count = 1){ unique_lock lock(mMutex); mV-= count; while (mV < 0) mCond.wait(lock); } }; template <typename Task> class TaskThread { thread mThread; Task *mTask; semaphore *mSemStarting, *mSemFinished; volatile bool mRunning; public: TaskThread(Task *task, semaphore *start, semaphore *finish): mTask(task), mRunning(true), mSemStart(start), mSemFinished(finish), mThread(&TaskThread<Task>::psrun){} ~TaskThread(){ mThread.join(); } void run(){ do { (*mTask)(); mSemFinished->signal(); mSemStart->wait(); } while (mRunning); } void finish() { // end the thread after the current loop mRunning = false; } private: static void psrun(TaskThread<Task> *self){ self->run();} }; classcMyTask { public: MyTask(){} void operator()(){ // some code here } }; int main(){ MyTask task1; MyTask task2; semaphore start(2), finished(0); TaskThread<MyTask> t1(&task1, &start, &finished); TaskThread<MyTask> t2(&task2, &start, &finished); for (int i = 0; i < 10; i++){ finished.wait(2); start.signal(2); } t1.finish(); t2.finish(); } 

The proposed (crude) implementation above relies on the Task type, which operator() should provide (that is, a class similar to a functor). I said that you can include the task code directly in the body of the stream function earlier, but since I do not know this, I saved it as abstract as I could. There is one condition variable for the start of flows and one for their end, like encapsulated semaphore instances.

Seeing another answer suggesting to use boost::barrier , I can only support this idea: be sure to replace my semaphore class with this class, if possible , the reason is that you should rely on well-tested and supported external code rather than self-implemented solution for the same feature set.

In general, both approaches are valid, but the first gives little performance in favor of flexibility. If the task to be completed takes a lot of time, the cost of synchronizing the control and the queue becomes negligible.

Update: fixed and tested code. Replaced a simple condition variable with a semaphore.

+10
Mar 06 '13 at 16:35
source share

This can be easily achieved with a barrier (just a convenient wrapper over the conditional variable and counter). It basically blocks until all N threads have reached the β€œbarrier”. Then it is recycled again. Boost provides an implementation.

 void myfunc(void * p, boost::barrier& start_barrier, boost::barrier& end_barrier) { while (!stop_condition) // You'll need to tell them to stop somehow { start_barrier.wait (); do_something(p); end_barrier.wait (); } } int main(){ void * myp[n_threads] {a_location, another_location,...}; boost::barrier start_barrier (n_threads + 1); // child threads + main thread boost::barrier end_barrier (n_threads + 1); // child threads + main thread std::thread mythread[n_threads]; for (unsigned int i=0; i < n_threads; i++) { mythread[i] = std::thread(myfunc, myp[i], start_barrier, end_barrier); } start_barrier.wait (); // first unblock the threads for (unsigned long int j=0; j < ULONG_MAX; j++) { end_barrier.wait (); // mix_data must not execute before the threads are done mix_data(myp); start_barrier.wait (); // threads must not start new iteration before mix_data is done } return 0; } 
+5
Mar 06 '13 at 17:03
source share

Below is a simple compilation and working code that does some random things. He implements the concept of a barrier. The task length of each thread is different, so it is really necessary to have a strong synchronization mechanism. I will try to pool on the same tasks and compare the result, and then, possibly, with futures, as Andy Pole pointed out.

 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> #include <complex> #include <random> const unsigned int n_threads=4; //varying this will not (almost) change the total amount of work const unsigned int task_length=30000/n_threads; const float task_length_variation=task_length/n_threads; unsigned int rep=1000; //repetitions of tasks class t_chronometer{ private: std::chrono::steady_clock::time_point _t; public: t_chronometer(): _t(std::chrono::steady_clock::now()) {;} void reset() {_t = std::chrono::steady_clock::now();} double get_now() {return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - _t).count();} double get_now_ms() {return std::chrono::duration_cast<std::chrono::duration<double,std::milli>>(std::chrono::steady_clock::now() - _t).count();} }; class t_barrier { private: std::mutex m_mutex; std::condition_variable m_cond; unsigned int m_threshold; unsigned int m_count; unsigned int m_generation; public: t_barrier(unsigned int count): m_threshold(count), m_count(count), m_generation(0) { } bool wait() { std::unique_lock<std::mutex> lock(m_mutex); unsigned int gen = m_generation; if (--m_count == 0) { m_generation++; m_count = m_threshold; m_cond.notify_all(); return true; } while (gen == m_generation) m_cond.wait(lock); return false; } }; using namespace std; void do_something(complex<double> * c, unsigned int max) { complex<double> a(1.,0.); complex<double> b(1.,0.); for (unsigned int i = 0; i<max; i++) { a *= polar(1.,2.*M_PI*i/max); b *= polar(1.,4.*M_PI*i/max); *(c)+=a+b; } } bool done=false; void task(complex<double> * c, unsigned int max, t_barrier* start_barrier, t_barrier* end_barrier) { while (!done) { start_barrier->wait (); do_something(c,max); end_barrier->wait (); } cout << "task finished" << endl; } int main() { t_chronometer t; std::default_random_engine gen; std::normal_distribution<double> dis(.0,1000.0); complex<double> cpx[n_threads]; for (unsigned int i=0; i < n_threads; i++) { cpx[i] = complex<double>(dis(gen), dis(gen)); } t_barrier start_barrier (n_threads + 1); // child threads + main thread t_barrier end_barrier (n_threads + 1); // child threads + main thread std::thread mythread[n_threads]; unsigned long int sum=0; for (unsigned int i=0; i < n_threads; i++) { unsigned int max = task_length + i * task_length_variation; cout << i+1 << "th task length: " << max << endl; mythread[i] = std::thread(task, &cpx[i], max, &start_barrier, &end_barrier); sum+=max; } cout << "total task length " << sum << endl; complex<double> c(0,0); for (unsigned long int j=1; j < rep+1; j++) { start_barrier.wait (); //give to the threads the missing call to start if (j==rep) done=true; end_barrier.wait (); //wait for the call from each tread if (j%100==0) cout << "cycle: " << j << endl; for (unsigned int i=0; i<n_threads; i++) { c+=cpx[i]; } } for (unsigned int i=0; i < n_threads; i++) { mythread[i].join(); } cout << "result: " << c << " it took: " << t.get_now() << " s." << endl; return 0; } 
0
Mar 08 '13 at 15:27
source share



All Articles