Reusing a thread in a C ++ loop

I need to parallelize some tasks in a C ++ program and a completely new one for parallel programming. So far, I have made some progress in Internet search, but now I'm a bit stuck. I would like to reuse some threads in a loop, but obviously don't know how to do what I'm trying to do.

I get data from two ADC cards on a computer (purchased in parallel), then I need to perform some operations with the collected data (processed in parallel) when collecting the next batch of data. Here are some pseudo codes to illustrate.

//Acquire some data, wait for all the data to be acquired before proceeding std::thread acq1(AcquireData, boardHandle1, memoryAddress1a); std::thread acq2(AcquireData, boardHandle2, memoryAddress2a); acq1.join(); acq2.join(); while(user doesn't interrupt) { //Process first batch of data while acquiring new data std::thread proc1(ProcessData,memoryAddress1a); std::thread proc2(ProcessData,memoryAddress2a); acq1(AcquireData, boardHandle1, memoryAddress1b); acq2(AcquireData, boardHandle2, memoryAddress2b); acq1.join(); acq2.join(); proc1.join(); proc2.join(); /*Proceed in this manner, alternating which memory address is written to and being processed until the user interrupts the program.*/ } 

This is the main point of this. The next run of the loop will be written to the "a" memory address when processing the data "b" and continue to alternate (I can get the code for this, just pulled it out to prevent cluttering the problem).

In any case, the problem (as Iโ€™m sure some people can already say) is that the second time I try to use acq1 and acq2, the compiler (VS2012) says: "IntelliSense: calling an object of class type without the corresponding operator () or function conversion to function pointer type. " Similarly, if I again put std :: thread before acq1 and acq2, it says: "error C2374:" acq1 ": override; multiple initialization".

So the question is, can I reassign the threads of a new task when they have completed their previous task? I always wait until the previous use of the thread ends before calling it again, but I donโ€™t know how to reassign the thread, and since it is in a loop, I cannot create a new thread every time (or if I can, it seems wasteful and unnecessary but I could be wrong).

Thanks in advance

+7
c ++ multithreading loops reusability
source share
6 answers

The easiest way is to use the expected queue of std::function objects. Like this:

 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <functional> #include <chrono> class ThreadPool { public: ThreadPool (int threads) : shutdown_ (false) { // Create the specified number of threads threads_.reserve (threads); for (int i = 0; i < threads; ++i) threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i)); } ~ThreadPool () { { // Unblock any threads and tell them to stop std::unique_lock <std::mutex> l (lock_); shutdown_ = true; condVar_.notify_all(); } // Wait for all threads to stop std::cerr << "Joining threads" << std::endl; for (auto& thread : threads_) thread.join(); } void doJob (std::function <void (void)> func) { // Place a job on the queu and unblock a thread std::unique_lock <std::mutex> l (lock_); jobs_.emplace (std::move (func)); condVar_.notify_one(); } protected: void threadEntry (int i) { std::function <void (void)> job; while (1) { { std::unique_lock <std::mutex> l (lock_); while (! shutdown_ && jobs_.empty()) condVar_.wait (l); if (jobs_.empty ()) { // No jobs to do and we are shutting down std::cerr << "Thread " << i << " terminates" << std::endl; return; } std::cerr << "Thread " << i << " does a job" << std::endl; job = std::move (jobs_.front ()); jobs_.pop(); } // Do the job without holding any locks job (); } } std::mutex lock_; std::condition_variable condVar_; bool shutdown_; std::queue <std::function <void (void)>> jobs_; std::vector <std::thread> threads_; }; void silly (int n) { // A silly job for demonstration purposes std::cerr << "Sleeping for " << n << " seconds" << std::endl; std::this_thread::sleep_for (std::chrono::seconds (n)); } int main() { // Create two threads ThreadPool p (2); // Assign them 4 jobs p.doJob (std::bind (silly, 1)); p.doJob (std::bind (silly, 2)); p.doJob (std::bind (silly, 3)); p.doJob (std::bind (silly, 4)); } 
+15
source share

The std::thread designed to perform exactly one task (the one you give it in the constructor), and then ends. If you want to do more work, you will need a new thread. Starting with C ++ 11, that's all we have. Thread pools are not standard. (I'm not sure what C ++ 14 can say about them.)

Fortunately, you can easily implement the required logic yourself. Here is a large-scale picture:

  • Run n workflows that all do the following:
    • Repeat until there is more work:
      • Take the following task t (possibly until it is ready).
      • Process t.
  • Continue to insert new tasks into the processing queue.
  • Tell workflows that thereโ€™s nothing more to do.
  • Wait for the workflow to complete.

The hardest part here (which is still pretty simple) is correctly designing the work queue. Usually for this there will be a synchronized linked list (from STL). Synchronized means that any thread that wants to manage the queue should only do this after it has acquired std::mutex to avoid race conditions. If the workflow finds the list empty, it must wait until there is some work. You can use std::condition_variable for this. Each time a new task is inserted into the queue, the inserting thread notifies the thread that is waiting for the condition variable, and therefore stops the lock and eventually starts processing the new task.

The second not-so-trivial part is how to signal workflows what no longer needs to be done. It is clear that you can set some global flag, but if the worker is blocked in the queue, it will not be implemented in the near future. One solution could be to stream notify_all() and check the flag every time they are notified. Another option is to queue up several โ€œtoxicโ€ items. If a worker encounters this item, it exits.

The representation of the task queue is done directly using your self- task objects or just lambda.

All of the above features are C ++ 11. If you stick to an earlier version, you need to go to third-party libraries that provide multithreading for your specific platform.

While none of them is rocket science, for the first time itโ€™s still easy to make a mistake. And, unfortunately, concurrency-related errors are some of the most difficult to debug. Starting from several hours of reading through the appropriate sections of a good book or working through a textbook, you can quickly pay off.

+9
source share

it

  std::thread acq1(...) 

- call the constructor. building a new object named acq1

it

  acq1(...) 

is the use of operator () for an existing aqc1 object. If such an operator is not defined for std :: thread, the compiler complains.

As far as I know, you cannot reuse std :: threads. You build and run them. Join them and throw them away

0
source share

Well, it depends if you think about moving the reassignment or not. You can move the stream but not copy it.

Below code will create a new pair of threads at each iteration and move them instead of old threads. I assume this should work because the new thread objects will be temporary.

 while(user doesn't interrupt) { //Process first batch of data while acquiring new data std::thread proc1(ProcessData,memoryAddress1a); std::thread proc2(ProcessData,memoryAddress2a); acq1 = std::thread(AcquireData, boardHandle1, memoryAddress1b); acq2 = std::thread(AcquireData, boardHandle2, memoryAddress2b); acq1.join(); acq2.join(); proc1.join(); proc2.join(); /*Proceed in this manner, alternating which memory address is written to and being processed until the user interrupts the program.*/ } 

What happens is that the object does not actually end this lifetime at the end of the iteration, because it is declared in the outer scope with respect to the loop. But every new object is created every time and move takes place. I donโ€™t see what can be spared (I could be stupid), so I assume that this is exactly the same as declaring acq inside a loop and just reusing a character. All in all ... yes, this is about how you classify temporary creation and move .

In addition, this explicitly starts a new stream in each cycle (of course, ending the previously assigned stream), it does not make the stream wait for new data and magically feed them into the processing channel. You will need to implement it differently. For example: a pool of workflows and communication between queues.

Links: operator= , (ctor) .

I think that the errors you get do not require explanation, so I will skip their explanation.

0
source share

I think you need a much simpler answer to run a set of threads more than once, this is the best solution:

 do{ std::vector<std::thread> thread_vector; for (int i=0;i<nworkers;i++) { thread_vector.push_back(std::thread(yourFunction,Parameter1,Parameter2, ...)); } for(std::thread& it: thread_vector) { it.join(); } q++; } while(q<NTIMES); 
0
source share

You can also create your own Thread class and call its execution method, for example:

 class MyThread { public: void run(std::function<void()> func) { thread_ = std::thread(func); } void join() { if(thread_.joinable()) thread_.join(); } private: std::thread thread_; }; // Application code... MyThread myThread; myThread.run(AcquireData); 
0
source share

All Articles