Non-blocking parallel collection?

System.Collections.Concurrent has several new collections that work very well in multi-threaded environments. However, they are a bit limited. Either they are blocked until the element becomes available, or they return default(T) (TryXXX methods).

I need a collection that is thread safe, but instead of blocking the calling thread, it uses a callback to tell me that at least one item is available.

My current solution is to use a BlockingCollection, but use APM with a delegate to get the next element. In other words, I create a delegate for the method that Take from the collection and executes that delegate using BeginInvoke .

Unfortunately, for this I have to support many states in my class. Worse, the class is not thread safe; It can only be used by one thread. I will bypass the maintainability edge, which I would rather not do.

I know that there are some libraries that do what I am doing here quite simply (I believe that the Reactive Framework is one of them), but I would like to achieve my goals without adding any links outside of version 4 of the structure.

Are there any better templates that I can use that don't require external links that achieve my goal?


TL; DR:

Are there templates that satisfy the requirement:

"I need to tell the collection that I am ready for the next item, and for the collection to call back when this next item arrives, without locking the threads."

+7
collections multithreading c # nonblocking
source share
2 answers

I think I have two possible solutions. I'm not very happy either, but they at least provide a reasonable alternative to the APM approach.

The first does not meet your requirement not to block the stream, but I think it is pretty elegant, because you can register callbacks and they will be called in a cyclic mode, but you still have the option to call Take or TryTake , as usual for BlockingCollection . This code causes callbacks to be logged every time an item is requested. It is an alarm mechanism to collect. The best part about this approach is that calls to Take do not starve, as in my second decision.

 public class NotifyingBlockingCollection<T> : BlockingCollection<T> { private Thread m_Notifier; private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>(); public NotifyingBlockingCollection() { m_Notifier = new Thread(Notify); m_Notifier.IsBackground = true; m_Notifier.Start(); } private void Notify() { while (true) { Action<T> callback = m_Callbacks.Take(); T item = Take(); callback.BeginInvoke(item, null, null); // Transfer to the thread pool. } } public void RegisterForTake(Action<T> callback) { m_Callbacks.Add(callback); } } 

The second meets your requirements without blocking flow. Notice how it passes the callback to the thread pool. I did this because I think that if it is executed synchronously, then the locks will be held longer, which will lead to Add and RegisterForTake bottlenecks. I looked at it carefully and I don’t think it can be blocked in real time (both the item and the callback are available, but the callback is never executed), but you can check it for yourself to check. The only problem here is that the Take call would become hungry, since callbacks would always take precedence.

 public class NotifyingBlockingCollection<T> { private BlockingCollection<T> m_Items = new BlockingCollection<T>(); private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>(); public NotifyingBlockingCollection() { } public void Add(T item) { lock (m_Callbacks) { if (m_Callbacks.Count > 0) { Action<T> callback = m_Callbacks.Dequeue(); callback.BeginInvoke(item, null, null); // Transfer to the thread pool. } else { m_Items.Add(item); } } } public T Take() { return m_Items.Take(); } public void RegisterForTake(Action<T> callback) { lock (m_Callbacks) { T item; if (m_Items.TryTake(out item)) { callback.BeginInvoke(item, null, null); // Transfer to the thread pool. } else { m_Callbacks.Enqueue(callback); } } } } 
+4
source share

How about something like that? (Naming can probably use some work. And note that this has not been verified.)

 public class CallbackCollection<T> { // Sychronization object to prevent race conditions. private object _SyncObject = new object(); // A queue for callbacks that are waiting for items. private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>(); // A queue for items that are waiting for callbacks. private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>(); public void Add(T item) { Action<T> callback; lock (_SyncObject) { // Try to get a callback. If no callback is available, // then enqueue the item to wait for the next callback // and return. if (!_Callbacks.TryDequeue(out callback)) { _Items.Enqueue(item); return; } } ExecuteCallback(callback, item); } public void TakeAndCallback(Action<T> callback) { T item; lock(_SyncObject) { // Try to get an item. If no item is available, then // enqueue the callback to wait for the next item // and return. if (!_Items.TryDequeue(out item)) { _Callbacks.Enqueue(callback); return; } } ExecuteCallback(callback, item); } private void ExecuteCallback(Action<T> callback, T item) { // Use a new Task to execute the callback so that we don't // execute it on the current thread. Task.Factory.StartNew(() => callback.Invoke(item)); } } 
+3
source share

All Articles