Actor calculation model using boost :: thread

I am trying to implement the Actor thread calculation model in C ++ using boost :: thread. But the program throws a strange exception at runtime. The exception is unstable, and some program works correctly.

Here is my code:

actor.hpp

class Actor { public: typedef boost::function<int()> Job; private: std::queue<Job> d_jobQueue; boost::mutex d_jobQueueMutex; boost::condition_variable d_hasJob; boost::atomic<bool> d_keepWorkerRunning; boost::thread d_worker; void workerThread(); public: Actor(); virtual ~Actor(); void execJobAsync(const Job& job); int execJobSync(const Job& job); }; 

actor.cpp

 namespace { int executeJobSync(std::string *error, boost::promise<int> *promise, const Actor::Job *job) { int rc = (*job)(); promise->set_value(rc); return 0; } } void Actor::workerThread() { while (d_keepWorkerRunning) try { Job job; { boost::unique_lock<boost::mutex> g(d_jobQueueMutex); while (d_jobQueue.empty()) { d_hasJob.wait(g); } job = d_jobQueue.front(); d_jobQueue.pop(); } job(); } catch (...) { // Log error } } void Actor::execJobAsync(const Job& job) { boost::mutex::scoped_lock g(d_jobQueueMutex); d_jobQueue.push(job); d_hasJob.notify_one(); } int Actor::execJobSync(const Job& job) { std::string error; boost::promise<int> promise; boost::unique_future<int> future = promise.get_future(); { boost::mutex::scoped_lock g(d_jobQueueMutex); d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job)); d_hasJob.notify_one(); } int rc = future.get(); if (rc) { ErrorUtil::setLastError(rc, error.c_str()); } return rc; } Actor::Actor() : d_keepWorkerRunning(true) , d_worker(&Actor::workerThread, this) { } Actor::~Actor() { d_keepWorkerRunning = false; { boost::mutex::scoped_lock g(d_jobQueueMutex); d_hasJob.notify_one(); } d_worker.join(); } 

In fact, the exception that is thrown is boost :: thread_interrupted in the string int rc = future.get(); . But to force the documents, I can not explain this exception. Docs say

Throws: - boost :: thread_interrupted if the result associated with * is not ready at the call point and the current thread is interrupted.

But my workflow cannot be in an interrupted state.

When I used gdb and set "catch throw", I see that the backtrace looks like

throw thread_interrupted

boost :: more info :: interruption_checker :: check_for_interruption

boost :: more info :: interruption_checker :: interruption_checker

raise :: condition_variable :: wait

nudging :: more :: future_object_base :: wait_internal

pushing :: more :: future_object_base :: wait

boost :: more :: future_object :: get

promotion :: unique_future :: get

I was looking for forward sources, but I can’t understand why interrupt_checker decided that the workflow was interrupted.

So, someone is a C ++ guru, please help me. What do I need to do to get the correct code? I use:

boost 1_53

Linux Version 2.6.18-194.32.1.el5 Red Hat 4.1.2-48

gcc 4.7

EDIT

Fixed! Thanks to Eugene Panasyuk and Lazin. The problem was in TLS management. boost :: thread and boost :: thread_specific_ptr use the same TLS repository for their purposes. In my case, there was a problem when they both tried to change this repository at creation (unfortunately, I did not understand why this was happening in detail). Therefore, TLS is corrupted.

I replaced boost :: thread_specific_ptr from my code using the __thread specified variable.

Offtop: during debugging, I found memory corruption in an external library and fixed it =)

.

EDIT 2 I have an exact problem ... This is a bug in GCC =) The compilation flag _GLIBCXX_DEBUG breaks the ABI. You can see the discussion on boost bugtracker: https://svn.boost.org/trac/boost/ticket/7666

+8
c ++ multithreading boost actor
source share
2 answers

I found some errors:


Function

Actor::workerThread performs double unlock on d_jobQueueMutex . The first unlock is manual d_jobQueueMutex.unlock(); The second is the boost::unique_lock<boost::mutex> destructor.

unique_lock should be avoided, for example, the release relationship between unique_lock and mutex :

 g.release(); // <------------ PATCH d_jobQueueMutex.unlock(); 

Or add an extra code block + the default Job .


It is possible that workerThread will never leave the following loop:

 while (d_jobQueue.empty()) { d_hasJob.wait(g); } 

Imagine the following case: d_jobQueue empty, Actor::~Actor() is called, it sets a flag and notifies the worker thread:

 d_keepWorkerRunning = false; d_hasJob.notify_one(); 

workerThread wakes up during a loop, sees that the queue is empty and sleeps again.

It is usually customary to send a special final task to stop the workflow:

 ~Actor() { execJobSync([this]()->int { d_keepWorkerRunning = false; return 0; }); d_worker.join(); } 

In this case, d_keepWorkerRunning not required to be atomic.


LIVE DEMO at Coliru


EDIT

I have added event queue code to your example.

You have a parallel queue in EventQueueImpl and Actor , but for different types. You can select the common part into a separate object concurrent_queue<T> , which works for any type. It would be much easier to debug and test the queue in one place than to catch errors scattered across different classes.

So you can try using concurrent_queue<T> (on Coliru)

+5
source share

This is just an assumption. I think some code can actually call boost :: tread :: interrupt () . You can set a breakpoint for this function and see which code is responsible for this. You can check the interrupt in execJobSync :

 int Actor::execJobSync(const Job& job) { if (boost::this_thread::interruption_requested()) std::cout << "Interruption requested!" << std::endl; std::string error; boost::promise<int> promise; boost::unique_future<int> future = promise.get_future(); 

The most suspicious code in this case is the code that refers to the stream object.

Good practice is that you can still interrupt the stream. It is also possible to disable interrupt for a certain area.

If this is not the case, you need to check the code that works with the local thread store, since the thread interrupt flag is stored in TLS. Maybe some of your code overwrites it. You can check the interrupt before and after such a piece of code.

Another possibility is that your memory is corrupt. If the code does not call boost :: thread :: interrupt () and you are not working with TLS. This is the most difficult case, try using some kind of dynamic analyzer - valgrind or clang memory sanitizer.

Offtopic: You probably need to use some parallel queue. std :: queue will be very slow due to high memory competition, and you will end up with poor cache performance. A good parallel queue allows your code to insert and unload elements in parallel.

In addition, an actor is not something that must execute arbitrary code. The actor’s lineup should receive simple messages, not functions! You are writing a job queue :) You need to look at some kind of acting system, for example Akka or libcpa .

+2
source share

All Articles