I have a multi-threaded application that uses boost :: asio and boost :: coroutine through its integration into boost :: asio. Each thread has its own io_service object. The only shared state between threads is connection pools, which are blocked by mutex when a connection receives or returns from a connection pool. When there are not enough connections in the pool, I click the infinite asio :: stable_tiemer in the internal structure of the pool and wait asynchronously from it, and I give way from the curutin function. When another thread returns a connection to the pool, it checks to see if there are wait timers, it receives a wait timer from the internal structure, receives its io_service object and sends a lambda that wakes up the timer to resume the coroutine suspension. I have random crashes in the application. I am trying to investigate a problem with valgrind. He found some problems, but I canβt understand them, because they occur in boost :: coroutine and boost :: asio. Here are the snippets of my code and exit from valgrind. Can anyone see and explain the problem?
Here is the call code:
template <class ContextsType> void executeRequests(ContextsType& avlRequestContexts) { AvlRequestDataList allRequests; for(auto& requestContext : avlRequestContexts) { if(!requestContext.pullProvider || !requestContext.toAskGDS()) continue; auto& requests = requestContext.pullProvider->getRequestsData(); copy(requests.begin(), requests.end(), back_inserter(allRequests)); } if(allRequests.size() == 0) return; boost::asio::io_service ioService; curl::AsioMultiplexer multiplexer(ioService); for(auto& request : allRequests) { using namespace boost::asio; spawn(ioService, [&multiplexer, &request](yield_context yield) { request->prepare(multiplexer, yield); }); } while(true) { try { VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop."); ioService.run(); VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished."); break; } catch(const std::exception& e) { VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what()); } catch(...) { VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request."); } } }
Here is the implementation of the prepare function, which is called in the generated lambda:
void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer, boost::asio::yield_context yield) { auto& ioService = multiplexer.getIoService(); _connection = _pool.getConnection(ioService, yield); _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS); multiplexer.addEasyHandle(_connection->getHandle(), [this](const curl::EasyHandleResult& result) { if(0 == result.responseCode) returnQuota(); VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse); _pool.addConnection(std::move(_connection)); }); } void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer, boost::asio::yield_context yield) { try { prepareImpl(multiplexer, yield); } catch(const std::exception& e) { VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what()); returnQuota(); } catch(...) { VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request."); returnQuota(); } }
The returnQuota function is a pure virtual method of the AvlRequestData class, and its implementation for the TravelportRequestData class, which is used in all my tests, is as follows:
void returnQuota() const override { auto& avlQuotaManager = AvlQuotaManager::getInstance(); avlQuotaManager.consumeQuotaTravelport(-1); }
The following are the push and pop methods of the connection pool.
auto AvlConnectionPool::getConnection( TimerPtr timer, asio::yield_context yield) -> ConnectionPtr { lock_guard<mutex> lock(_mutex); while(_connections.empty()) { _timers.emplace_back(timer); timer->expires_from_now( asio::steady_timer::clock_type::duration::max()); _mutex.unlock(); coroutineAsyncWait(*timer, yield); _mutex.lock(); } ConnectionPtr connection = std::move(_connections.front()); _connections.pop_front(); VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.") % _connectionPoolName % _connections.size())); ++_connectionsGiven; return connection; } void AvlConnectionPool::addConnection(ConnectionPtr connection, Side side ) { lock_guard<mutex> lock(_mutex); if(Front == side) _connections.emplace_front(std::move(connection)); else _connections.emplace_back(std::move(connection)); VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.") % _connectionPoolName % _connections.size())); if(_timers.empty()) return; auto timer = _timers.back(); _timers.pop_back(); auto& ioService = timer->get_io_service(); ioService.post([timer](){ timer->cancel(); }); VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.") % _connectionPoolName)); }
This is an implementation of coroutineAsyncWait .
inline void coroutineAsyncWait(boost::asio::steady_timer& timer, boost::asio::yield_context yield) { boost::system::error_code ec; timer.async_wait(yield[ec]); if(ec && ec != boost::asio::error::operation_aborted) throw std::runtime_error(ec.message()); }
And finally, the first part of the output of valgrind:
== 8189 == Topic 41:
== 8189 == Incorrect reading of size 8
== 8189 == at 0x995F84: void boost :: coroutines :: detail :: trampoline_push_void, void, boost :: asio :: detail :: coro_entry_point, void (anonymous namespace) :: executeRequests β (std :: vector <( anonymous namespace) :: AvlRequestContext, std :: allocator <(anonymous namespace) :: AvlRequestContext β &) :: {lambda (boost :: asio :: basic_yield_context>) # 1}> &, boost :: coroutines :: basic_standard_stack_allocator β (long) (trampoline_push.hpp: 65)
== 8189 == Address 0x2e3b5528 is not stack'd, malloc'd or (recently) free'd
When I use valgrind with an attached debugger, it stops in the next function in trampoline_push.hpp in the boost :: coroutine library.
53β template< typename Coro > 54β void trampoline_push_void( intptr_t vp) 55β { 56β typedef typename Coro::param_type param_type; 57β 58β BOOST_ASSERT( vp); 59β 60β param_type * param( 61β reinterpret_cast< param_type * >( vp) ); 62β BOOST_ASSERT( 0 != param); 63β 64β Coro * coro( 65β> reinterpret_cast< Coro * >( param->coro) ); 66β BOOST_ASSERT( 0 != coro); 67β 68β coro->run(); 69β }