I am trying to implement the Actor thread calculation model in C ++ using boost :: thread. But the program throws a strange exception at runtime. The exception is unstable, and some program works correctly.
Here is my code:
actor.hpp
class Actor { public: typedef boost::function<int()> Job; private: std::queue<Job> d_jobQueue; boost::mutex d_jobQueueMutex; boost::condition_variable d_hasJob; boost::atomic<bool> d_keepWorkerRunning; boost::thread d_worker; void workerThread(); public: Actor(); virtual ~Actor(); void execJobAsync(const Job& job); int execJobSync(const Job& job); };
actor.cpp
namespace { int executeJobSync(std::string *error, boost::promise<int> *promise, const Actor::Job *job) { int rc = (*job)(); promise->set_value(rc); return 0; } } void Actor::workerThread() { while (d_keepWorkerRunning) try { Job job; { boost::unique_lock<boost::mutex> g(d_jobQueueMutex); while (d_jobQueue.empty()) { d_hasJob.wait(g); } job = d_jobQueue.front(); d_jobQueue.pop(); } job(); } catch (...) { // Log error } } void Actor::execJobAsync(const Job& job) { boost::mutex::scoped_lock g(d_jobQueueMutex); d_jobQueue.push(job); d_hasJob.notify_one(); } int Actor::execJobSync(const Job& job) { std::string error; boost::promise<int> promise; boost::unique_future<int> future = promise.get_future(); { boost::mutex::scoped_lock g(d_jobQueueMutex); d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job)); d_hasJob.notify_one(); } int rc = future.get(); if (rc) { ErrorUtil::setLastError(rc, error.c_str()); } return rc; } Actor::Actor() : d_keepWorkerRunning(true) , d_worker(&Actor::workerThread, this) { } Actor::~Actor() { d_keepWorkerRunning = false; { boost::mutex::scoped_lock g(d_jobQueueMutex); d_hasJob.notify_one(); } d_worker.join(); }
In fact, the exception that is thrown is boost :: thread_interrupted in the string int rc = future.get(); . But to force the documents, I can not explain this exception. Docs say
Throws: - boost :: thread_interrupted if the result associated with * is not ready at the call point and the current thread is interrupted.
But my workflow cannot be in an interrupted state.
When I used gdb and set "catch throw", I see that the backtrace looks like
throw thread_interrupted
boost :: more info :: interruption_checker :: check_for_interruption
boost :: more info :: interruption_checker :: interruption_checker
raise :: condition_variable :: wait
nudging :: more :: future_object_base :: wait_internal
pushing :: more :: future_object_base :: wait
boost :: more :: future_object :: get
promotion :: unique_future :: get
I was looking for forward sources, but I canβt understand why interrupt_checker decided that the workflow was interrupted.
So, someone is a C ++ guru, please help me. What do I need to do to get the correct code? I use:
boost 1_53
Linux Version 2.6.18-194.32.1.el5 Red Hat 4.1.2-48
gcc 4.7
EDIT
Fixed! Thanks to Eugene Panasyuk and Lazin. The problem was in TLS management. boost :: thread and boost :: thread_specific_ptr use the same TLS repository for their purposes. In my case, there was a problem when they both tried to change this repository at creation (unfortunately, I did not understand why this was happening in detail). Therefore, TLS is corrupted.
I replaced boost :: thread_specific_ptr from my code using the __thread specified variable.
Offtop: during debugging, I found memory corruption in an external library and fixed it =)
.
EDIT 2 I have an exact problem ... This is a bug in GCC =) The compilation flag _GLIBCXX_DEBUG breaks the ABI. You can see the discussion on boost bugtracker: https://svn.boost.org/trac/boost/ticket/7666