Boost :: asio, thread pools and thread monitoring

I implemented a thread pool using boost::asio , and several boost::thread objects calling boost::asio::io_service::run() . However, the requirement I was given was to be able to track all topics for "health." My intention is to create a simple watch object that can be passed through the thread pool - if it passes, then we can assume that the thread is still processing the work.

However, given my implementation, I'm not sure how (if) I can reliably control all threads in the pool. I just delegated the stream function boost::asio::io_service::run() , so sending the sentinel object to the io_service instance io_service not guarantee which stream will actually receive this tell-tale and do the job.

One option might be to simply periodically insert a sentinel signal and hope that it will be picked up by each thread at least once for a reasonable amount of time, but this is obviously not ideal.

Take the following example. Due to the fact that the handler is encoded, in this case we see that each thread will do the same work, but in fact I will not control the implementation of the handler, some of them can be long, while others will be almost immediately .

 #include <iostream> #include <boost/asio.hpp> #include <vector> #include <boost/thread.hpp> #include <boost/bind.hpp> void handler() { std::cout << boost::this_thread::get_id() << "\n"; boost::this_thread::sleep(boost::posix_time::milliseconds(100)); } int main(int argc, char **argv) { boost::asio::io_service svc(3); std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc)); boost::thread one(boost::bind(&boost::asio::io_service::run, &svc)); boost::thread two(boost::bind(&boost::asio::io_service::run, &svc)); boost::thread three(boost::bind(&boost::asio::io_service::run, &svc)); svc.post(handler); svc.post(handler); svc.post(handler); svc.post(handler); svc.post(handler); svc.post(handler); svc.post(handler); svc.post(handler); svc.post(handler); svc.post(handler); work.reset(); three.join(); two.join(); one.join(); return 0; } 
+6
source share
2 answers

The solution I used depends on the fact that I have an implementation of tread pool objects. I created a shell type that will update statistics, and copy the user-defined handlers to the thread pool. Only this type of shell is always sent to the underlying io_service . This method allows me to track handlers that are sent / executed without the need to impose them in user code.

Here is a simplified and simplified example:

 #include <iostream> #include <memory> #include <vector> #include <boost/thread.hpp> #include <boost/asio.hpp> // Supports scheduling anonymous jobs that are // executable as returning nothing and taking // no arguments typedef std::function<void(void)> functor_type; // some way to store per-thread statistics typedef std::map<boost::thread::id, int> thread_jobcount_map; // only this type is actually posted to // the asio proactor, this delegates to // the user functor in operator() struct handler_wrapper { handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics) : user_functor_(user_functor) , statistics_(statistics) { } void operator()() { user_functor_(); // just for illustration purposes, assume a long running job boost::this_thread::sleep(boost::posix_time::milliseconds(100)); // increment executed jobs ++statistics_[boost::this_thread::get_id()]; } functor_type user_functor_; thread_jobcount_map& statistics_; }; // anonymous thread function, just runs the proactor void thread_func(boost::asio::io_service& proactor) { proactor.run(); } class ThreadPool { public: ThreadPool(size_t thread_count) { threads_.reserve(thread_count); work_.reset(new boost::asio::io_service::work(proactor_)); for(size_t curr = 0; curr < thread_count; ++curr) { boost::thread th(thread_func, boost::ref(proactor_)); // inserting into this map before any work can be scheduled // on it, means that we don't have to look it for lookups // since we don't dynamically add threads thread_jobcount_.insert(std::make_pair(th.get_id(), 0)); threads_.emplace_back(std::move(th)); } } // the only way for a user to get work into // the pool is to use this function, which ensures // that the handler_wrapper type is used void schedule(const functor_type& user_functor) { handler_wrapper to_execute(user_functor, thread_jobcount_); proactor_.post(to_execute); } void join() { // join all threads in pool: work_.reset(); proactor_.stop(); std::for_each( threads_.begin(), threads_.end(), [] (boost::thread& t) { t.join(); }); } // just an example showing statistics void log() { std::for_each( thread_jobcount_.begin(), thread_jobcount_.end(), [] (const thread_jobcount_map::value_type& it) { std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n"; }); } private: std::vector<boost::thread> threads_; std::unique_ptr<boost::asio::io_service::work> work_; boost::asio::io_service proactor_; thread_jobcount_map thread_jobcount_; }; struct add { add(int lhs, int rhs, int* result) : lhs_(lhs) , rhs_(rhs) , result_(result) { } void operator()() { *result_ = lhs_ + rhs_; } int lhs_,rhs_; int* result_; }; int main(int argc, char **argv) { // some "state objects" that are // manipulated by the user functors int x = 0, y = 0, z = 0; // pool of three threads ThreadPool pool(3); // schedule some handlers to do some work pool.schedule(add(5, 4, &x)); pool.schedule(add(2, 2, &y)); pool.schedule(add(7, 8, &z)); // give all the handlers time to execute boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); std::cout << "x = " << x << "\n" << "y = " << y << "\n" << "z = " << z << "\n"; pool.join(); pool.log(); } 

Conclusion:

 x = 9 y = 4 z = 15 Thread: 0000000000B25430 executed 1 jobs Thread: 0000000000B274F0 executed 1 jobs Thread: 0000000000B27990 executed 1 jobs 
+2
source

You can use a common io_service instance between all threads and a private io_service instance for each thread. Each thread will perform this method:

 void Mythread::threadLoop() { while(/* termination condition */) { commonIoService.run_one(); privateIoService.run_one(); commonConditionVariable.timed_wait(time); } } 

Thus, if you want some task to be executed in a thread, you need to send this task only to its io_service.

To send a task to the thread pool, you can do:

 void MyThreadPool::post(Hander handler) { commonIoService.post(handler); commonConditionVariable.notify_all(); } 
+6
source

Source: https://habr.com/ru/post/923996/


All Articles