TPL Queue Processing

I am currently working on a project, and I need to queue some tasks for processing, this is the requirement:

  • Jobs must be processed in turn
  • The next item must be available in

So I want something similar to:

Task<result> QueueJob(params here) { /// Queue the job and somehow return a waitable task that will wait until the queued job has been executed and return the result. } 

I tried to run a background task that simply pulls items from the queue and processes the task, but difficulties arise from the background task to the method.

If necessary, I could only go along the path of the completion callback request in the QueueJob method, but it would be great if I could get the transparent task back, which allows you to wait for the task to be processed (even if there are tasks in front of it in the queue).

+1
c # queue task-parallel-library
source share
3 answers

Func<T> takes no parameters and returns a value of type T. Tasks are started one by one, and you can wait for the returned task to get the result.

 public class TaskQueue { private Queue<Task> InnerTaskQueue; private bool IsJobRunning; public void Start() { Task.Factory.StartNew(() => { while (true) { if (InnerTaskQueue.Count > 0 && !IsJobRunning) { var task = InnerTaskQueue.Dequeue() task.Start(); IsJobRunning = true; task.ContinueWith(t => IsJobRunning = false); } else { Thread.Sleep(1000); } } } } public Task<T> QueueJob(Func<T> job) { var task = new Task<T>(() => job()); InnerTaskQueue.Enqueue(task); return task; } } 
0
source share

You may find TaskCompletionSource<T> useful, you can use it to create a Task that finishes exactly when you want it. If you combine it with BlockingCollection<T> , you will get a turn:

 class JobProcessor<TInput, TOutput> : IDisposable { private readonly Func<TInput, TOutput> m_transform; // or a custom type instead of Tuple private readonly BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>> m_queue = new BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>(); public JobProcessor(Func<TInput, TOutput> transform) { m_transform = transform; Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); } private void ProcessQueue() { Tuple<TInput, TaskCompletionSource<TOutput>> tuple; while (m_queue.TryTake(out tuple, Timeout.Infinite)) { var input = tuple.Item1; var tcs = tuple.Item2; try { tcs.SetResult(m_transform(input)); } catch (Exception ex) { tcs.SetException(ex); } } } public Task<TOutput> QueueJob(TInput input) { var tcs = new TaskCompletionSource<TOutput>(); m_queue.Add(Tuple.Create(input, tcs)); return tcs.Task; } public void Dispose() { m_queue.CompleteAdding(); } } 
+5
source share

I would go for something like this:

 class TaskProcessor<TResult> { // TODO: Error handling! readonly BlockingCollection<Task<TResult>> blockingCollection = new BlockingCollection<Task<TResult>>(new ConcurrentQueue<Task<TResult>>()); public Task<TResult> AddTask(Func<TResult> work) { var task = new Task<TResult>(work); blockingCollection.Add(task); return task; // give the task back to the caller so they can wait on it } public void CompleteAddingTasks() { blockingCollection.CompleteAdding(); } public TaskProcessor() { ProcessQueue(); } void ProcessQueue() { Task<TResult> task; while (blockingCollection.TryTake(out task)) { task.Start(); task.Wait(); // ensure this task finishes before we start a new one... } } } 

Depending on the type of application that uses it, you can disable BlockingCollection / ConcurrentQueue for something simpler (for example, just for a simple queue). You can also configure the signature of the AddTask method depending on which methods / parameters you will be in the queue ...

+2
source share

All Articles