Combining threads in C ++ 11

Relevant questions :

About C ++ 11:

  • C ++ 11: std :: thread pool?
  • Will async (launch :: async) in C ++ 11 make thread pools obsolete to avoid costly thread creation?

About Boost:

  • C ++ speed up reuse of threads
  • boost :: thread and create a pool of them!



How do I get a thread pool to send tasks to without creating and deleting them again and again? This means constant streams for resynchronization without connection.




I have a code that looks like this:

namespace { std::vector<std::thread> workers; int total = 4; int arr[4] = {0}; void each_thread_does(int i) { arr[i] += 2; } } int main(int argc, char *argv[]) { for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { workers.push_back(std::thread(each_thread_does, j)); } for (std::thread &t: workers) { if (t.joinable()) { t.join(); } } arr[4] = std::min_element(arr, arr+4); } return 0; } 

Instead of creating and attaching threads to each iteration, I would rather send tasks to my workflows for each iteration and create them only once.

+105
c ++ multithreading c ++ 11 threadpool stdthread
Apr 01 '13 at 21:59
source share
10 answers

You can use the C ++ thread pool library, https://github.com/vit-vit/ctpl .

Then the code you wrote can be replaced by the following

 #include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library int main (int argc, char *argv[]) { ctpl::thread_pool p(2 /* two threads in the pool */); int arr[4] = {0}; std::vector<std::future<void>> results(4); for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { results[j] = p.push([&arr, j](int){ arr[j] +=2; }); } for (int j = 0; j < 4; ++j) { results[j].get(); } arr[4] = std::min_element(arr, arr + 4); } } 

You will receive the required number of threads and will not create and delete them again and again in iterations.

+70
05 Sep '14 at 11:20
source share

A thread pool means that all your threads are running, all the time - in other words, the thread function never returns. To give the threads something meaningful, you need to create an inter-thread communication system, both in order to tell the thread that there is something to do, and to transfer the actual working data.

As a rule, this will be associated with some kind of parallel data structure, and each thread will apparently sleep on some kind of condition variable, which will be notified when this is done. After receiving the notification, one or several threads wake up, restore the task from the parallel data structure, process it and save the result in a similar way.

Then the thread continued to check whether it would work even more, and if not to return to sleep mode.

The result is that you need to develop all this yourself, because there is no natural concept of โ€œworkโ€ that is universally applicable. This is quite a bit of work, and there are some subtle issues that you need to do right. (You can program in Go if you like a system that takes care of thread management for you backstage.)

+58
Apr 01 '13 at 22:27
source share

This is copied from my answer to another very similar post, hope this helps:

1) Start with the maximum number of threads that the system can support:

 int Num_Threads = thread::hardware_concurrency(); 

2) For the effective implementation of the thread pool, as soon as the threads are created in accordance with Num_Threads, it is better not to create new ones and not destroy the old ones (by combining). This will lead to a decrease in performance, it may even make your application run slower than the serial version.

Each C ++ 11 thread should work in its function with an infinite loop, constantly waiting for new tasks to be received and launched.

Here's how to attach such a function to a thread pool:

 int Num_Threads = thread::hardware_concurrency(); vector<thread> Pool; for(int ii = 0; ii < Num_Threads; ii++) { Pool.push_back(thread(Infinite_loop_function));} 

3) Function Infinite_loop_function

This is a while (true) loop waiting for a task queue

 void The_Pool:: Infinite_loop_function() { while(true) { { unique_lock<mutex> lock(Queue_Mutex); condition.wait(lock, []{return !Queue.empty() || therminate_pool}); Job = Queue.front(); Queue.pop(); } Job(); // function<void()> type } }; 

4) Make a function to add work to your turn

 void The_Pool:: Add_Job(function<void()> New_Job) { { unique_lock<mutex> lock(Queue_Mutex); Queue.push(New_Job); } condition.notify_one(); } 

5) Bind an arbitrary function to your queue

 Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object)); 

Once you integrate these ingredients, you will have your own dynamic thread pool. These threads always work, waiting for the job to complete.

I apologize if there are any syntax errors, I typed this code and I have bad memory. Sorry that I cannot provide you with the full thread pool code that would disrupt my work.

Edit: to end the pool, call the shutdown () method:

 XXXX::shutdown(){ { unique_lock<mutex> lock(threadpool_mutex); terminate_pool = true;} // use this flag in condition.wait condition.notify_all(); // wake up all threads. // Join all threads. for(std::thread &every_thread : thread_vector) { every_thread.join();} thread_vector.empty(); stopped = true; // use this flag in destructor, if not set, call shutdown() 

}

+53
Sep 15 '15 at 19:12
source share

In a thread stream, there is a stream of threads bound to a function that acts as an event loop. These threads will wait indefinitely for the task to complete or for its completion.

The task of threadpool is to provide an interface for submitting tasks, determining (and possibly changing) the policy for launching these tasks (scheduling rules, creating threads, pool size) and monitoring the status of threads and their associated resources.

So, for a universal pool, you must first determine what the task is, how it is started, interrupted, what the result is (see the concept of promise and future for this question), what events will have threads should respond to how they will process them, how these events must be discriminated against from those performed by the tasks. This can become quite complex, as you can see, and put limits on how the threads will work as the solution becomes more and more involved.

The current event handling utility is pretty barebones (*): primitives such as mutexes, condition variables, and a few abstractions on top of it (locks, barriers). But in some cases, these abstractions may not be suitable (see link question ), and you need to return to using primitives.

Other issues must also be addressed:

  • Signal
  • i / o
  • hardware (processor affinity, heterogeneous tuning)

How will they play in your setup?

This answer to a similar question points to an existing implementation intended for boost and stl.

I suggested a very crude implementation of the thread for another issue that does not address many of the issues described above. You might want to create one. You may also want to look at existing frameworks in other languages โ€‹โ€‹to find inspiration.




(*) I do not see this as a problem, as if on the contrary. I think the spirit of C ++ itself is inherited from C.

+18
Apr 01 '13 at
source share

Something like this might help (taken from a working application).

 #include <memory> #include <boost/asio.hpp> #include <boost/thread.hpp> struct thread_pool { typedef std::unique_ptr<boost::asio::io_service::work> asio_worker; thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) { for (int i = 0; i < threads; ++i) { auto worker = [this] { return service.run(); }; grp.add_thread(new boost::thread(worker)); } } template<class F> void enqueue(F f) { service.post(f); } ~thread_pool() { service_worker.reset(); grp.join_all(); service.stop(); } private: boost::asio::io_service service; asio_worker service_worker; boost::thread_group grp; }; 

You can use it as follows:

 thread_pool pool(2); pool.enqueue([] { std::cout << "Hello from Task 1\n"; }); pool.enqueue([] { std::cout << "Hello from Task 2\n"; }); 

Keep in mind that rethinking the efficient mechanism of asynchronous queues is not trivial.

Boost :: asio :: io_service is a very efficient implementation or is actually a set of shells specific to the platform (for example, it transfers I / O completion ports to Windows).

+3
Jan 27 '16 at 13:08
source share

Edit: C ++ 17 and concepts are now required. (As of 9/12/16, only g ++ 6.0+ is enough.)

The template output is much more accurate because of this, so it is well worth the effort to get a newer compiler. I have not yet found a function that requires explicit template arguments.

Now it also accepts any suitable called object (and is still statically mutating !!!).

Now it also includes an additional thread thread pool with a green thread using the same API. However, this class is POSIX. It uses the ucontext_t API to switch tasks in user space.




I created a simple library for this. Below is an example of use. (I answer this because it was one of the things I found before I decided to write it myself.)

 bool is_prime(int n){ // Determine if n is prime. } int main(){ thread_pool pool(8); // 8 threads list<future<bool>> results; for(int n = 2;n < 10000;n++){ // Submit a job to the pool. results.emplace_back(pool.async(is_prime, n)); } int n = 2; for(auto i = results.begin();i != results.end();i++, n++){ // i is an iterator pointing to a future representing the result of is_prime(n) cout << n << " "; bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result. if(prime) cout << "is prime"; else cout << "is not prime"; cout << endl; } } 

You can pass async any function with any return value (or void) and any (or not) arguments, and it will return the corresponding std::future . To get the result (or just wait for the task to complete), you call get() in the future.

Here's github: https://github.com/Tyler-Hardin/thread_pool .

+2
Jun 15 '14 at 18:57
source share

This is another implementation of the thread pool, which is very simple, easy to understand and use, uses only the standard C ++ 11 library and can be viewed or modified for your needs, should be a good starting point if you want to start using the pool thread:

https://github.com/progschj/ThreadPool

+2
Feb 15 '18 at 11:31
source share
 Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool: 

function_pool.h

 #pragma once #include <queue> #include <functional> #include <mutex> #include <condition_variable> #include <atomic> #include <cassert> class Function_pool { private: std::queue<std::function<void()>> m_function_queue; std::mutex m_lock; std::condition_variable m_data_condition; std::atomic<bool> m_accept_functions; public: Function_pool(); ~Function_pool(); void push(std::function<void()> func); void done(); void infinite_loop_func(); }; 

function_pool.cpp

 #include "function_pool.h" Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true) { } Function_pool::~Function_pool() { } void Function_pool::push(std::function<void()> func) { std::unique_lock<std::mutex> lock(m_lock); m_function_queue.push(func); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap lock.unlock(); m_data_condition.notify_one(); } void Function_pool::done() { std::unique_lock<std::mutex> lock(m_lock); m_accept_functions = false; lock.unlock(); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap m_data_condition.notify_all(); //notify all waiting threads. } void Function_pool::infinite_loop_func() { std::function<void()> func; while (true) { { std::unique_lock<std::mutex> lock(m_lock); m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; }); if (!m_accept_functions && m_function_queue.empty()) { //lock will be release automatically. //finish the thread loop and let it join in the main thread. return; } func = m_function_queue.front(); m_function_queue.pop(); //release the lock } func(); } } 

main.cpp

 #include "function_pool.h" #include <string> #include <iostream> #include <mutex> #include <functional> #include <thread> #include <vector> Function_pool func_pool; class quit_worker_exception : public std::exception {}; void example_function() { std::cout << "bla" << std::endl; } int main() { std::cout << "stating operation" << std::endl; int num_threads = std::thread::hardware_concurrency(); std::cout << "number of threads = " << num_threads << std::endl; std::vector<std::thread> thread_pool; for (int i = 0; i < num_threads; i++) { thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool)); } //here we should send our functions for (int i = 0; i < 50; i++) { func_pool.push(example_function); } func_pool.done(); for (unsigned int i = 0; i < thread_pool.size(); i++) { thread_pool.at(i).join(); } } 
+2
Jul 18
source share

A pool of threads without dependencies outside of the STL is entirely possible. I recently wrote a small header-only thread library to solve the same problem. It supports dynamic resizing of the pool (changing the number of workers at runtime), waiting, stopping, pausing, resuming, etc. I hope you find this helpful.

0
Oct 31 '17 at 12:26
source share

You can use thread_pool from the Boost library:

 void my_task(){...} int main(){ int threadNumbers = thread::hardware_concurrency(); boost::asio::thread_pool pool(threadNumbers); // Submit a function to the pool. boost::asio::post(pool, my_task); // Submit a lambda object to the pool. boost::asio::post(pool, []() { ... }); } 



You can also use the thread pool from the open source community:

 void first_task() {...} void second_task() {...} int main(){ int threadNumbers = thread::hardware_concurrency(); pool tp(threadNumbers); // Add some tasks to the pool. tp.schedule(&first_task); tp.schedule(&second_task); } 
0
Jun 20 '19 at 8:41
source share



All Articles