The concept you want is threadpool. This SO> question is about existing implementations.
The idea is to have a container for multiple thread instances. Each instance is associated with a function that checks the task queue, and when a task is available, pulls it out and starts it. When a task completes (if it completes, but this is another problem), the thread simply gets into the task queue.
So, you need a synchronized queue, a thread class that implements a loop in a queue, an interface for task objects, and possibly a class to manage everything (pool class).
Alternatively, you can create a very specialized thread class for the task that it needs to perform (for example, only a memory area as a parameter). This requires a notification mechanism for threads to indicate that they are running at the current iteration.
The main function of the stream will be a cycle for this particular task, and at the end of one iteration, the stream signals its end and expects variable conditions to start the next cycle. Essentially, you would have to insert the task code into the stream, completely eliminating the need for a queue.
using namespace std; // semaphore class based on C++11 features class semaphore { private: mutex mMutex; condition_variable v; int mV; public: semaphore(int v): mV(v){} void signal(int count=1){ unique_lock lock(mMutex); mV+=count; if (mV > 0) mCond.notify_all(); } void wait(int count = 1){ unique_lock lock(mMutex); mV-= count; while (mV < 0) mCond.wait(lock); } }; template <typename Task> class TaskThread { thread mThread; Task *mTask; semaphore *mSemStarting, *mSemFinished; volatile bool mRunning; public: TaskThread(Task *task, semaphore *start, semaphore *finish): mTask(task), mRunning(true), mSemStart(start), mSemFinished(finish), mThread(&TaskThread<Task>::psrun){} ~TaskThread(){ mThread.join(); } void run(){ do { (*mTask)(); mSemFinished->signal(); mSemStart->wait(); } while (mRunning); } void finish() { // end the thread after the current loop mRunning = false; } private: static void psrun(TaskThread<Task> *self){ self->run();} }; classcMyTask { public: MyTask(){} void operator()(){ // some code here } }; int main(){ MyTask task1; MyTask task2; semaphore start(2), finished(0); TaskThread<MyTask> t1(&task1, &start, &finished); TaskThread<MyTask> t2(&task2, &start, &finished); for (int i = 0; i < 10; i++){ finished.wait(2); start.signal(2); } t1.finish(); t2.finish(); }
The proposed (crude) implementation above relies on the Task type, which operator() should provide (that is, a class similar to a functor). I said that you can include the task code directly in the body of the stream function earlier, but since I do not know this, I saved it as abstract as I could. There is one condition variable for the start of flows and one for their end, like encapsulated semaphore instances.
Seeing another answer suggesting to use boost::barrier , I can only support this idea: be sure to replace my semaphore class with this class, if possible , the reason is that you should rely on well-tested and supported external code rather than self-implemented solution for the same feature set.
In general, both approaches are valid, but the first gives little performance in favor of flexibility. If the task to be completed takes a lot of time, the cost of synchronizing the control and the queue becomes negligible.
Update: fixed and tested code. Replaced a simple condition variable with a semaphore.