Is there such a synchronization tool as "one-element-sized asynchronous task buffers"?

Many times in the development of the user interface, I process events in such a way that when an event arrives, I immediately start processing, but if there is one processing operation, I wait for it to complete before processing another event. If more than one event occurs before the operation is completed, I process only the last one.

As I usually do, my process method has a loop, and in the event handler I check a field that indicates that I'm currently processing something, and if I - I put my current event arguments in another field, which basically there is one element size buffer and when the current processing pass is completed - I check if there is any other event to process, and I loop until it ends.

Now this seems too repetitive and perhaps not the most elegant way to do it, although it seems to work fine for me. I have two questions:

  • What do I need to do to have a name?
  • Is there a way to reuse sync that can do this for me?

I'm going to add something to the set of asynchronous coordination primitives of Stephen Tuub that I included in my toolkit .

+3
source share
3 answers

So, first we look at the case you described when a method is always used from a user interface thread or some other synchronization context. The Run method may itself be async to handle all marshaling through the synchronization context for us.

If we run, we simply set the next saved action. If we do not, we will indicate that we are working now, waiting for the action, and then continue to wait for the next action until the next action. We guarantee that when we finish, we will indicate that we are finished working:

 public class EventThrottler { private Func<Task> next = null; private bool isRunning = false; public async void Run(Func<Task> action) { if (isRunning) next = action; else { isRunning = true; try { await action(); while (next != null) { var nextCopy = next; next = null; await nextCopy(); } } finally { isRunning = false; } } } private static Lazy<EventThrottler> defaultInstance = new Lazy<EventThrottler>(() => new EventThrottler()); public static EventThrottler Default { get { return defaultInstance.Value; } } } 

Since the class, at least, will usually be used exclusively from the user interface thread, there should usually be only one, so I added the default instance convenience property, but since it may still make sense to be more than one in the program, I did not his single.

Run accepts Func<Task> with the idea that in general it will be an asynchronous lambda. It might look like this:

 public class Foo { public void SomeEventHandler(object sender, EventArgs args) { EventThrottler.Default.Run(async () => { await Task.Delay(1000); //do other stuff }); } } 

Well, therefore, just to be detailed, here is a version that handles the case when event handlers are called from different threads. I know that you said that you think they are all called from the UI thread, but I generalized it a bit. This means blocking all access to the fields of the type instance in the lock block, but actually not executing the function inside the lock block. This last part is important not only for performance, to ensure that we do not block elements by simply setting the next field, but also to avoid problems with this action also triggering, so that it does not need to deal with re-solutions or possible deadlocks . This template, which makes the material in the lock block and then responds based on the conditions defined in the lock, sets local variables that indicate what should be done after the lock is completed.

 public class EventThrottlerMultiThreaded { private object key = new object(); private Func<Task> next = null; private bool isRunning = false; public void Run(Func<Task> action) { bool shouldStartRunning = false; lock (key) { if (isRunning) next = action; else { isRunning = true; shouldStartRunning = true; } } Action<Task> continuation = null; continuation = task => { Func<Task> nextCopy = null; lock (key) { if (next != null) { nextCopy = next; next = null; } else { isRunning = false; } } if (nextCopy != null) nextCopy().ContinueWith(continuation); }; if (shouldStartRunning) action().ContinueWith(continuation); } } 
+2
source

Do I need to have a name?

What you describe is like a trampoline combined with a collapsing burst. A trampoline is basically a loop that iteratively calls return functions. An example is CurrentThreadScheduler in Reactive Extensions. When an item is scheduled on CurrentThreadScheduler , the work item is added to the scheduler thread queue, after which one of the following events will occur:

  • If the trampoline is already running (i.e. the current thread is already processing the thread queue), then the call to Schedule() returns immediately.
  • If the springboard is not running (i.e. work items are not running / running in the current thread), then the current thread starts processing the items in the local thread branch until it becomes empty, after which the call to Schedule() returns.

The collapsing queue accumulates processed elements with the added twist that if an equivalent element is already in the queue, then this element is simply replaced by a new element (as a result, only the last of the equivalent elements remaining in the queue of the queue, unlike both). The idea is to avoid handling obsolete / obsolete events. Consider a consumer of market data (e.g. stock ticks). If you receive multiple updates for frequently traded security, each update makes previous updates obsolete. It probably makes no sense to process early ticks for the same security if a newer tick has already appeared. Thus, a coagulation queue is suitable.

In your scenario, you have a trampoline that processes a minimized queue for which all incoming events are considered equivalent. This results in an effective maximum queue size of 1, since each item added to a non-empty queue will exit the existing item.

Is there a way to reuse sync that can do this for me?

I don’t know about an existing solution that will meet your needs, but you can certainly create a generalized trampoline or cycle of events that can support smooth planning strategies. The default strategy can use a standard queue, while other strategies can use a priority queue or a collapse queue.

+2
source

What you are describing is very similar to how the TPL Dataflow BrodcastBlock : it always remembers only the last item you sent to it. If you combine it with an ActionBlock that performs your action and has capacity only for the element to be processed, you get what you want (the method needs a better name):

 // returns send delegate private static Action<T> CreateProcessor<T>(Action<T> executedAction) { var broadcastBlock = new BroadcastBlock<T>(null); var actionBlock = new ActionBlock<T>( executedAction, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); broadcastBlock.LinkTo(actionBlock); return item => broadcastBlock.Post(item); } 

Use may be something like this:

 var processor = CreateProcessor<int>( i => { Console.WriteLine(i); Thread.Sleep(i); }); processor(100); processor(1); processor(2); 

Output:

 100 2 
+1
source

All Articles