C ++ Delay Tasks

I want to implement a way to schedule a task later. The interface will look like JavaScript setTimeout(function, milliseconds) .

In my application, certain resources belong to a thread. To avoid race conditions, they should always be accessed from the same thread. If other threads want to access the resource, they must send the task object to the resource stream.

So, I need to solve two problems:

  • send task to stream
  • Call delay

The first problem is quickly fixed with a non-queue lock that has a resource flow on the consumption side. (I use TBB concurrent_bounded_queue.) The second problem, however, is not so obvious to me. I can think of two strategies:

  • Run a new thread for each task. This thread will reset the required delay, and then send the task in a parallel queue.
  • Start only one thread that starts a loop that iterates through the scheduled tasks and calls them if their timeout has expired.

I experimented with both approaches, and I am inclined to the first, because it is simple and reliable, and the second, as a rule, is more prone to subtle errors. The first approach delegates this to the OS thread scheduler.

However, the first solution creates many short-lived threads, while I usually hear the recommendation of reusing threads.

+7
c ++ multithreading
source share
1 answer

Manual implementation will be something like this.

 struct myrunnable { uint64_t id_; uint64_t stamp_; std::function<void()> runnable_; uint64_t id() { return id_; } uint64_t stamp() { return stamp_; } void execute() { if (runnable_) runnable_(); } }; typedef std::shared_ptr<myrunnable> task_t; // timestamp_cmp_t - a comparator by timestamp + incrementing task id typedef tbb::concurrent_blocking_queue<task_t> queue_t; typedef std::priority_queue<task, timestamp_cmp_t> schedule_t; uint64_t now(); // a wrapper around gettimeofday(), write yourself queue_t queue; // inbound concurrent blocking queue not bound in size schedule_t schedule; // priority queue, a scheduler // queue_t sink; // optional sink concurrent queue if you don't // want to execute tasks in the scheduler thread context // now() - a wrapper around gettimeofday(), write yourself for(;;) { // "termination mark" comments below - exit points while (!schedule.empty() && schedule.top().stamp() <= now()) { task_t task = schedule.pop(); task .execute(); // alternatively sink.push(task) to offload scheduler thread } if (schedule.empty()) { task_t task = queue.pop(); // block on the input queue if (!task) return; // scheduler termination mark, empty task schedule.push(task); } else { // Here we are driven by our latency/cpu balance requirements // in this example we are ultra low latency and are just spinning CPU // and on Linux such thread may need extra tuning to perform consistently. // To pace it down one can use TBB sleep_for() or select() system call while (schedule.top().stamp() > now()) { task_t task; if (queue.try_pop(task)) { if (!task) return; // scheduler termination mark, empty task schedule.push(task); } } } } 
+3
source share

All Articles