Can I use std :: async without waiting for a future restriction?

High level
I want to call some functions without a return value in asynchronous mode, without waiting for them to complete. If I use std :: async, the future object does not destroy until the task finishes, this will make the call unsynchronized in my case.

Example

void sendMail(const std::string& address, const std::string& message) { //sending the e-mail which takes some time... } myResonseType processRequest(args...) { //Do some processing and valuate the address and the message... //Sending the e-mail async auto f = std::async(std::launch::async, sendMail, address, message); //returning the response ASAP to the client return myResponseType; } //<-- I'm stuck here until the async call finish to allow f to be destructed. // gaining no benefit from the async call. 

My questions

  • Is there a way to overcome this limitation?
  • if (1) is no, should I implement a thread once that will receive these zombie futures and wait on them?
  • Is (1) and (2) not, is there another option, then just create your own thread pool?

Note:
I prefer not to use the thread + detach option (suggested by @ galop1n), since there is an overhead when creating a new thread that I want to avoid. When using std :: async (at least on MSVC), the internal thread pool is used.

Thank.

+25
c ++ multithreading asynchronous c ++ 11 stdasync
Feb 03 '14 at 15:23
source share
3 answers

You can transfer the future to the global object, so when you start the local future destructor, it does not need to wait for the completion of the asynchronous stream.

 std::vector<std::future<void>> pending_futures; myResonseType processRequest(args...) { //Do some processing and valuate the address and the message... //Sending the e-mail async auto f = std::async(std::launch::async, sendMail, address, message); // transfer the future shared state to a longer-lived future pending_futures.push_back(std::move(f)); //returning the response ASAP to the client return myResponseType; } 

NB This is unsafe if the asynchronous stream references any local variables in the processRequest function.

When using std::async (at least on MSVC) the internal thread pool is used.

In fact, this does not meet the requirements, the standard explicitly states that tasks performed using std::launch::async should be performed as in a new thread, therefore, any local thread variables should not be saved from one task to another. This usually does not matter.

+11
Mar 20 '14 at 20:41
source share
β€” -

why don't you just start the thread and disconnect if you don't want to join?

 std::thread{ sendMail, address, message}.detach(); 

std :: async is tied to the lifetime of std :: future, which it returns, and there are no alternatives to this.

Enabling std :: future in the wait queue read by another thread will require the same security mechanism as the pool receiving the new task, such as a mutex around the container.

So your best bet is a thread pool to use tasks directly nested in a thread safe queue. And it will not depend on the specific implementation.

Below is the implementation of a pool of threads that accept any called arguments and arguments, threads execute a session in a queue, the best implementation is to use condition variables ( coliru ):

 #include <iostream> #include <queue> #include <memory> #include <thread> #include <mutex> #include <functional> #include <string> struct ThreadPool { struct Task { virtual void Run() const = 0; virtual ~Task() {}; }; template < typename task_, typename... args_ > struct RealTask : public Task { RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {} void Run() const override { fun_(); } private: decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_; }; template < typename task_, typename... args_ > void AddTask( task_&& task, args_&&... args ) { auto lock = std::unique_lock<std::mutex>{mtx_}; using FinalTask = RealTask<task_, args_... >; q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) ); } ThreadPool() { for( auto & t : pool_ ) t = std::thread( [=] { while ( true ) { std::unique_ptr<Task> task; { auto lock = std::unique_lock<std::mutex>{mtx_}; if ( q_.empty() && stop_ ) break; if ( q_.empty() ) continue; task = std::move(q_.front()); q_.pop(); } if (task) task->Run(); } } ); } ~ThreadPool() { { auto lock = std::unique_lock<std::mutex>{mtx_}; stop_ = true; } for( auto & t : pool_ ) t.join(); } private: std::queue<std::unique_ptr<Task>> q_; std::thread pool_[8]; std::mutex mtx_; volatile bool stop_ {}; }; void foo( int a, int b ) { std::cout << a << "." << b; } void bar( std::string const & s) { std::cout << s; } int main() { ThreadPool pool; for( int i{}; i!=42; ++i ) { pool.AddTask( foo, 3, 14 ); pool.AddTask( bar, " - " ); } } 
+5
Feb 03 '14 at 15:28
source share

Instead of moving the future to a global object (and manually managing the removal of unused futures), you can transfer it to the local area of ​​the asynchronous function.

"Let the asynchronous function accept its own future," so to speak.

I came up with this template shell that works for me (tested on Windows):

 #include <future> template<class Function, class... Args> void async_wrapper(Function&& f, Args&&... args, std::future<void>& future, std::future<void>&& is_valid, std::promise<void>&& is_moved) { is_valid.wait(); // Wait until the return value of std::async is written to "future" auto our_future = std::move(future); // Move "future" to a local variable is_moved.set_value(); // Only now we can leave void_async in the main thread // This is also used by std::async so that member function pointers work transparently auto functor = std::bind(f, std::forward<Args>(args)...); functor(); } template<class Function, class... Args> // This is what you call instead of std::async void void_async(Function&& f, Args&&... args) { std::future<void> future; // This is for std::async return value // This is for our synchronization of moving "future" between threads std::promise<void> valid; std::promise<void> is_moved; auto valid_future = valid.get_future(); auto moved_future = is_moved.get_future(); // Here we pass "future" as a reference, so that async_wrapper // can later work with std::async return value future = std::async( async_wrapper<Function, Args...>, std::forward<Function>(f), std::forward<Args>(args)..., std::ref(future), std::move(valid_future), std::move(is_moved) ); valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid moved_future.wait(); // Wait for "future" to actually be moved } 

I am a little surprised that this works because I thought that the moved future destructor would block until we leave async_wrapper. It should wait for async_wrapper to return, but it is waiting inside this function itself. It is logical that this should be a dead end, but it is not.

I also tried adding a line to the end of async_wrapper to manually delete the future object:

 our_future = std::future<void>(); 

It also does not block.

0
Sep 18 '16 at 22:08
source share



All Articles