How to run a set of functions in parallel and wait for the results after completion?

I have a requirement to run a set of heavy functions asynchronously at a point in time and populate the results in a list. Here is the pseudo code for this:

List<TResult> results = new List<TResults>(); List<Func<T, TResult>> tasks = PopulateTasks(); foreach(var task in tasks) { // Run Logic in question 1. Run each task asynchronously/parallely 2. Put the results in the results list upon each task completion } Console.WriteLine("All tasks completed and results populated"); 

I need the logic inside the foreach bock. Can you guys help me?

I have some limitation: the solution should be compatible with .net 3.5 (not .net 4, but an alternative .net 4 solution would be appreciated for my knowledgeable purpose)

Thanks in advance.

+4
source share
6 answers

A simple implementation of 3.5 might look like this:

 List<TResult> results = new List<TResults>(); List<Func<T, TResult>> tasks = PopulateTasks(); ManualResetEvent waitHandle = new ManualResetEvent(false); void RunTasks() { int i = 0; foreach(var task in tasks) { int captured = i++; ThreadPool.QueueUserWorkItem(state => RunTask(task, captured)) } waitHandle.WaitOne(); Console.WriteLine("All tasks completed and results populated"); } private int counter; private readonly object listLock = new object(); void RunTask(Func<T, TResult> task, int index) { var res = task(...); //You haven't specified where the parameter comes from lock (listLock ) { results[index] = res; } if (InterLocked.Increment(ref counter) == tasks.Count) waitHandle.Set(); } 
+4
source
 List<Func<T, TResult>> tasks = PopulateTasks(); TResult[] results = new TResult[tasks.Length]; Parallel.For(0, tasks.Count, i => { results[i] = tasks[i](); }); 

TPL for 3.5 seems to exist .

+4
source
  public static IList<IAsyncResult> RunAsync<T>(IEnumerable<Func<T>> tasks) { List<IAsyncResult> asyncContext = new List<IAsyncResult>(); foreach (var task in tasks) { asyncContext.Add(task.BeginInvoke(null, null)); } return asyncContext; } public static IEnumerable<T> WaitForAll<T>(IEnumerable<Func<T>> tasks, IEnumerable<IAsyncResult> asyncContext) { IEnumerator<IAsyncResult> iterator = asyncContext.GetEnumerator(); foreach (var task in tasks) { iterator.MoveNext(); yield return task.EndInvoke(iterator.Current); } } public static void Main() { var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList(); var asyncContext = RunAsync(tasks); var results = WaitForAll(tasks, asyncContext); foreach (var result in results) { Console.WriteLine(result); } } public static int ComputeValue() { Thread.Sleep(1000); return Guid.NewGuid().ToByteArray().Sum(a => (int)a); } 
+1
source

Another option would be to implement a small future template:

  public class Future<T> { public Future(Func<T> task) { Task = task; _asyncContext = task.BeginInvoke(null, null); } private IAsyncResult _asyncContext; public Func<T> Task { get; private set; } public T Result { get { return Task.EndInvoke(_asyncContext); } } public bool IsCompleted { get { return _asyncContext.IsCompleted; } } } public static IList<Future<T>> RunAsync<T>(IEnumerable<Func<T>> tasks) { List<Future<T>> asyncContext = new List<Future<T>>(); foreach (var task in tasks) { asyncContext.Add(new Future<T>(task)); } return asyncContext; } public static IEnumerable<T> WaitForAll<T>(IEnumerable<Future<T>> futures) { foreach (var future in futures) { yield return future.Result; } } public static void Main() { var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList(); var futures = RunAsync(tasks); var results = WaitForAll(futures); foreach (var result in results) { Console.WriteLine(result); } } public static int ComputeValue() { Thread.Sleep(1000); return Guid.NewGuid().ToByteArray().Sum(a => (int)a); } 
+1
source

The traditional way is to use Sempahore. Initialize the semaphore with the number of threads that you are using, then start the thread for the task and wait for the semaphore object. When each thread completes, it should increase the semaphore. When the semaphore count reaches 0, the main thread that was waiting will continue.

0
source

Perform processing in separate worker instances, each in its own thread. Use the callback to return the results and the process call signal that the thread is running. Use the dictionary to keep track of current threads. If you have many threads, you should load the queue and start new threads as the old ones end. In this example, all threads are created before they are launched, in order to prevent a race condition when the number of started threads drops to zero before the final threads start.

  Dictionary<int, Thread> activeThreads = new Dictionary<int, Thread>(); void LaunchWorkers() { foreach (var task in tasks) { Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult)); Thread thread = new Thread(worker.Done); thread.IsBackground = true; activeThreads.Add(thread.ManagedThreadId, thread); } lock (activeThreads) { activeThreads.Values.ToList().ForEach(n => n.Start()); } } void ProcessResult(int threadId, TResult result) { lock (results) { results.Add(result); } lock (activeThreads) { activeThreads.Remove(threadId); // done when activeThreads.Count == 0 } } } public delegate void WorkerDoneDelegate(object results); class Worker { public WorkerDoneDelegate Done; public void Work(Task task, WorkerDoneDelegate Done) { // process task Done(Thread.CurrentThread.ManagedThreadId, result); } } 
0
source

All Articles