Priority Queues in the Parallel Task Library

Is there any preliminary work to add tasks during TPL execution with variable priority?

If not, generally speaking, how would this be implemented?

Ideally, I plan to use the producer-consumer pattern to add the "todo" work to the TPL. There may be times when I find that a low priority task needs to be updated to a high priority task (relative to others).

If someone has search keywords that I should use when searching, list them, as I have not yet found the code that will do what I need.

+4
source share
3 answers

ParallelExtensionsExtras contains several custom TaskScheduler s that can be useful either directly or as a base for your own scheduler.

In particular, two schedulers may be of interest to you:

  • QueuedTaskScheduler , which allows you to schedule Task at different priorities, but does not allow you to change the priority in the Task s queue.
  • ReprioritizableTaskScheduler , which does not have different priorities, but allows you to move a specific Task to the beginning or end of the queue. (Although changing the priority is O (n) in the number of waiting for Task s, which can be a problem if you had a lot of Task at the same time.)
+2
source

So here is a rather naive parallel implementation around a rather naive priority queue. The idea here is that there is a sorted set that contains pairs of both the real element and the priority, but it is provided with a comparative that simply compares the priority. The constructor takes a function that calculates the priority for this object.

As for the actual implementation, they are inefficiently implemented, I just lock about everything. Creating more efficient implementations will prevent SortedSet from being used as a priority queue and re-implementing one of those that can be effectively accessed simultaneously will not be so simple.

To change the priority of an element, you will need to remove the element from the set, and then add it again and find it without repeating the entire set, you need to know the old priority, as well as the new priority.

 public class ConcurrentPriorityQueue<T> : IProducerConsumerCollection<T> { private object key = new object(); private SortedSet<Tuple<T, int>> set; private Func<T, int> prioritySelector; public ConcurrentPriorityQueue(Func<T, int> prioritySelector, IComparer<T> comparer = null) { this.prioritySelector = prioritySelector; set = new SortedSet<Tuple<T, int>>( new MyComparer<T>(comparer ?? Comparer<T>.Default)); } private class MyComparer<T> : IComparer<Tuple<T, int>> { private IComparer<T> comparer; public MyComparer(IComparer<T> comparer) { this.comparer = comparer; } public int Compare(Tuple<T, int> first, Tuple<T, int> second) { var returnValue = first.Item2.CompareTo(second.Item2); if (returnValue == 0) returnValue = comparer.Compare(first.Item1, second.Item1); return returnValue; } } public bool TryAdd(T item) { lock (key) { return set.Add(Tuple.Create(item, prioritySelector(item))); } } public bool TryTake(out T item) { lock (key) { if (set.Count > 0) { var first = set.First(); item = first.Item1; return set.Remove(first); } else { item = default(T); return false; } } } public bool ChangePriority(T item, int oldPriority, int newPriority) { lock (key) { if (set.Remove(Tuple.Create(item, oldPriority))) { return set.Add(Tuple.Create(item, newPriority)); } else return false; } } public bool ChangePriority(T item) { lock (key) { var result = set.FirstOrDefault(pair => object.Equals(pair.Item1, item)); if (object.Equals(result.Item1, item)) { return ChangePriority(item, result.Item2, prioritySelector(item)); } else { return false; } } } public void CopyTo(T[] array, int index) { lock (key) { foreach (var item in set.Select(pair => pair.Item1)) { array[index++] = item; } } } public T[] ToArray() { lock (key) { return set.Select(pair => pair.Item1).ToArray(); } } public IEnumerator<T> GetEnumerator() { return ToArray().AsEnumerable().GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public void CopyTo(Array array, int index) { lock (key) { foreach (var item in set.Select(pair => pair.Item1)) { array.SetValue(item, index++); } } } public int Count { get { lock (key) { return set.Count; } } } public bool IsSynchronized { get { return true; } } public object SyncRoot { get { return key; } } } 

Once you have the instance of IProducerConsumerCollection<T> above, you can use it as an internal BlockingCollection<T> support object to have an easier to use user interface.

+2
source

I am implementing the priority FIFO task queue.
It is very simple and works like a Priority ActionBlock.

Check if you are at https://github.com/ido-ran/PriorityTaskQueue

+1
source

All Articles