How to buffer a burst of events for fewer resulting actions

I want to reduce several events in one slow motion. After some kind of trigger, I expect a few more similar triggers to come, but I prefer not to repeat the delays I received. The action is waiting to give a chance to complete before the explosion.
Question: how can I do this in an elegant reusable way?
So far, I have used the property to mark the event and triggered the action with a delay, as shown below:

public  void SomeMethod()
    {
        SomeFlag = true; //this will intentionally return to the caller before completing the resulting buffered actions.
    }
    private bool someFlag;
    public bool SomeFlag
    {
        get { return someFlag; }
        set
        {
            if (someFlag != value)
            {
                someFlag = value;
                if (value)
                    SomeDelayedMethod(5000);
            }
        }
    }

    public async void SomeDelayedMethod(int delay)
    {
        //some bufferred work.
        await Task.Delay(delay);
        SomeFlag = false;
    }

- , ... , - , , ( (, )). )

 public void SerializeAccountsToConfig()
    {
        if (!alreadyFlagged)
        {
            alreadyFlagged = true;
            SerializeDelayed(5000, Serialize);
        }
    }
    public async void SerializeDelayed(int delay, Action whatToDo)
    {
        await Task.Delay(delay);
        whatToDo();
    }

    private bool alreadyFlagged;
    private void Serialize()
    {
        //some buferred work.
        //string json = JsonConvert.SerializeObject(Accounts, Formatting.Indented);
        //Settings1.Default.Accounts = json;
        //Settings1.Default.Save();
        alreadyFlagged = false;
    }
+4
3

, , .
:

public DelayedSingleAction<Account> SendMailD;

( , ):

SendMailD = new DelayedSingleAction<Account>(SendMail, AccountRef, 5000);

SendMailD.PerformAction();

- , "". :

public int SendMail(Account A)
    {}

    public class DelayedSingleAction<T>
{
    private readonly Func<T, int> actionOnObj;
    private T tInstance;
    private readonly long millisecondsDelay;
    private long _syncValue = 1;

    public DelayedSingleAction(Func<T, int> ActionOnObj, T TInstance, long MillisecondsDelay)
    {
        actionOnObj = ActionOnObj;
        tInstance = TInstance;
        millisecondsDelay = MillisecondsDelay;
    }

    private Task _waitingTask = null;

    private void DoActionAndClearTask(Task _)
    {
        Console.WriteLine(string.Format("{0:h:mm:ss.fff} DelayedSingleAction Resetting SyncObject: Thread {1} for {2}", DateTime.Now, System.Threading.Thread.CurrentThread.ManagedThreadId, tInstance));
        Interlocked.Exchange(ref _syncValue, 1);
        actionOnObj(tInstance);
    }

    public void PerformAction()
    {
        if (Interlocked.Exchange(ref _syncValue, 0) == 1)
        {
            Console.WriteLine(string.Format("{0:h:mm:ss.fff} DelayedSingleAction Starting the timer: Thread {1} for {2}", DateTime.Now, System.Threading.Thread.CurrentThread.ManagedThreadId, tInstance));
            _waitingTask = Task.Delay(TimeSpan.FromMilliseconds(millisecondsDelay)).ContinueWith(DoActionAndClearTask);
        }
    }

    public Task Complete()
    {
        return _waitingTask ?? Task.FromResult(0);
    }
}
0

- .

DelayedSingleAction, , . , , , , , , .

public class DelayedSingleAction
{
    private readonly Action _action;
    private readonly long _millisecondsDelay;
    private long _syncValue = 1;
    public DelayedSingleAction(Action action, long millisecondsDelay)
    {
        _action = action;
        _millisecondsDelay = millisecondsDelay;
    }

    private Task _waitingTask = null;
    private void DoActionAndClearTask(Task _)
    {
        Interlocked.Exchange(ref _syncValue, 1);
        _action();
    }

    public void PerformAction()
    {
        if (Interlocked.Exchange(ref _syncValue, 0) == 1)
        {
            _waitingTask = Task.Delay(TimeSpan.FromMilliseconds(_millisecondsDelay))
                               .ContinueWith(DoActionAndClearTask);
        }
    }

    public Task Complete()
    {
        return _waitingTask ?? Task.FromResult(0);
    }
}

. dotnetfiddle , .

https://dotnetfiddle.net/el14wZ

+4

RX :

static void Main(string[] args)
{
    // event source
    var burstEvents = Observable.Interval(TimeSpan.FromMilliseconds(50));

    var subscription = burstEvents

        .Buffer(TimeSpan.FromSeconds(3)) // collect events 3 seconds
        //.Buffer(50) // or collect 50 events

        .Subscribe(events =>
        {
            //Console.WriteLine(events.First()); // take only first event

            // or process event collection
            foreach (var e in events)
                Console.Write(e + " ");
            Console.WriteLine();
        });

    Console.ReadLine();
    return;
}
+2

All Articles