What causes a random crash in boost :: coroutine?

I have a multi-threaded application that uses boost :: asio and boost :: coroutine through its integration into boost :: asio. Each thread has its own io_service object. The only shared state between threads is connection pools, which are blocked by mutex when a connection receives or returns from a connection pool. When there are not enough connections in the pool, I click the infinite asio :: stable_tiemer in the internal structure of the pool and wait asynchronously from it, and I give way from the curutin function. When another thread returns a connection to the pool, it checks to see if there are wait timers, it receives a wait timer from the internal structure, receives its io_service object and sends a lambda that wakes up the timer to resume the coroutine suspension. I have random crashes in the application. I am trying to investigate a problem with valgrind. He found some problems, but I can’t understand them, because they occur in boost :: coroutine and boost :: asio. Here are the snippets of my code and exit from valgrind. Can anyone see and explain the problem?

Here is the call code:

template <class ContextsType> void executeRequests(ContextsType& avlRequestContexts) { AvlRequestDataList allRequests; for(auto& requestContext : avlRequestContexts) { if(!requestContext.pullProvider || !requestContext.toAskGDS()) continue; auto& requests = requestContext.pullProvider->getRequestsData(); copy(requests.begin(), requests.end(), back_inserter(allRequests)); } if(allRequests.size() == 0) return; boost::asio::io_service ioService; curl::AsioMultiplexer multiplexer(ioService); for(auto& request : allRequests) { using namespace boost::asio; spawn(ioService, [&multiplexer, &request](yield_context yield) { request->prepare(multiplexer, yield); }); } while(true) { try { VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop."); ioService.run(); VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished."); break; } catch(const std::exception& e) { VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what()); } catch(...) { VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request."); } } } 

Here is the implementation of the prepare function, which is called in the generated lambda:

 void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer, boost::asio::yield_context yield) { auto& ioService = multiplexer.getIoService(); _connection = _pool.getConnection(ioService, yield); _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS); multiplexer.addEasyHandle(_connection->getHandle(), [this](const curl::EasyHandleResult& result) { if(0 == result.responseCode) returnQuota(); VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse); _pool.addConnection(std::move(_connection)); }); } void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer, boost::asio::yield_context yield) { try { prepareImpl(multiplexer, yield); } catch(const std::exception& e) { VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what()); returnQuota(); } catch(...) { VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request."); returnQuota(); } } 

The returnQuota function is a pure virtual method of the AvlRequestData class, and its implementation for the TravelportRequestData class, which is used in all my tests, is as follows:

 void returnQuota() const override { auto& avlQuotaManager = AvlQuotaManager::getInstance(); avlQuotaManager.consumeQuotaTravelport(-1); } 

The following are the push and pop methods of the connection pool.

 auto AvlConnectionPool::getConnection( TimerPtr timer, asio::yield_context yield) -> ConnectionPtr { lock_guard<mutex> lock(_mutex); while(_connections.empty()) { _timers.emplace_back(timer); timer->expires_from_now( asio::steady_timer::clock_type::duration::max()); _mutex.unlock(); coroutineAsyncWait(*timer, yield); _mutex.lock(); } ConnectionPtr connection = std::move(_connections.front()); _connections.pop_front(); VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.") % _connectionPoolName % _connections.size())); ++_connectionsGiven; return connection; } void AvlConnectionPool::addConnection(ConnectionPtr connection, Side side /* = Back */) { lock_guard<mutex> lock(_mutex); if(Front == side) _connections.emplace_front(std::move(connection)); else _connections.emplace_back(std::move(connection)); VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.") % _connectionPoolName % _connections.size())); if(_timers.empty()) return; auto timer = _timers.back(); _timers.pop_back(); auto& ioService = timer->get_io_service(); ioService.post([timer](){ timer->cancel(); }); VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.") % _connectionPoolName)); } 

This is an implementation of coroutineAsyncWait .

 inline void coroutineAsyncWait(boost::asio::steady_timer& timer, boost::asio::yield_context yield) { boost::system::error_code ec; timer.async_wait(yield[ec]); if(ec && ec != boost::asio::error::operation_aborted) throw std::runtime_error(ec.message()); } 

And finally, the first part of the output of valgrind:

== 8189 == Topic 41:
== 8189 == Incorrect reading of size 8
== 8189 == at 0x995F84: void boost :: coroutines :: detail :: trampoline_push_void, void, boost :: asio :: detail :: coro_entry_point, void (anonymous namespace) :: executeRequests β†’ (std :: vector <( anonymous namespace) :: AvlRequestContext, std :: allocator <(anonymous namespace) :: AvlRequestContext β†’ &) :: {lambda (boost :: asio :: basic_yield_context>) # 1}> &, boost :: coroutines :: basic_standard_stack_allocator β†’ (long) (trampoline_push.hpp: 65)
== 8189 == Address 0x2e3b5528 is not stack'd, malloc'd or (recently) free'd

When I use valgrind with an attached debugger, it stops in the next function in trampoline_push.hpp in the boost :: coroutine library.

 53β”‚ template< typename Coro > 54β”‚ void trampoline_push_void( intptr_t vp) 55β”‚ { 56β”‚ typedef typename Coro::param_type param_type; 57β”‚ 58β”‚ BOOST_ASSERT( vp); 59β”‚ 60β”‚ param_type * param( 61β”‚ reinterpret_cast< param_type * >( vp) ); 62β”‚ BOOST_ASSERT( 0 != param); 63β”‚ 64β”‚ Coro * coro( 65β”œ> reinterpret_cast< Coro * >( param->coro) ); 66β”‚ BOOST_ASSERT( 0 != coro); 67β”‚ 68β”‚ coro->run(); 69β”‚ } 
+1
source share
1 answer

I eventually found that when objects need to be removed, boost :: asio does not handle it gracefully without using shared_ptr and weak_ptr correctly. When failures occur, they are very difficult to debug because it is difficult to understand what the io_service queue does during a failure.

After doing a complete asynchronous client architecture recently and running into random failure problems, I have a few tips you can offer. Unfortunately, I cannot know if they will help solve your problems, but I hope this is a good start in the right direction.

Tips for Using Boout Asio Coroutine

  • Use boost :: asio :: asio_handler_invoke instead of io_service.post ():

    Auto & ioService = timer-> get_io_service ();

    ioService.post (timer {timer-> cancel ();});

    Using post / dispatch inside a coroutine is usually bad. Always always use asio_handler_invoke when you are called from a coroutine. In this case, however, you can probably safely call timer->cancel() without sending it in a message loop.

  • Your timers do not use shared_ptr objects. No matter what happens in the rest of your application, it is impossible to know exactly when these objects should be destroyed. I would highly recommend using shared_ptr objects for all of your timer objects. In addition, any pointer to class methods must use shared_from_this() . Using the simple this can be quite dangerous if this gets destroyed (on the stack) or goes out of scope somewhere else in shared_ptr. Whatever you do, do not use shared_from_this() in the constructor of the object!

    If you get a crash when executing a handler in io_service, but part of the handler is no longer valid, this is a delayed thing to debug. The handler object that is pumped into io_service includes any pointers to timers or pointers to objects that may be required to execute the handler.

    I highly recommend going overboard with shared_ptr objects wrapped around any asio classes. If the problem goes away, then its destruction order is likely.

  • Is the location of the failure address on the heap somewhere or does it point to the stack? This will help you determine if the object will go out of scope in the method at the wrong time, or if it is something else. For example, it proved to me that all my timers should become shared_ptr objects even in one streaming application.
+2
source

All Articles