How should I implement a “quiet period” when raising events?

I use the subscriber / notifier template to raise and consume events from my mid-level .Net in C #. Some events occur in bursts, for example, when data is saved from a batch program that imports a file. This performs a potentially long-term task, and I would like to avoid triggering the event several times per second by introducing a “quiet period”, as a result of which the event system waits until the event flow slows down the process of processing the event.

How do I do this when Publisher is actively involved in notifying subscribers? I do not want to wait until the event happens to check if there are others waiting for a quiet period ...

There is currently no host process for polling a subscription model. Should I give up the publication / signing template or is there a better way?

+5
source share
3 answers

Here is an approximate implementation that may indicate a direction. In my example, the task associated with the notification is to save the data object. When an object is saved, the Saved event. In addition to the simple Save method, I implemented the BeginSave and EndSave methods, as well as the overload Save, which works with these two to save packages. When EndSave is called, one BatchSaved event is fired.

, . , , , ... , . , , , .

EDIT: "" , , . , BatchSaved . , , , - .

class DataConcierge<T>
{
    // *************************
    // Simple save functionality
    // *************************

    public void Save(T dataObject)
    {
        // perform save logic

        this.OnSaved(dataObject);
    }

    public event DataObjectSaved<T> Saved;

    protected void OnSaved(T dataObject)
    {
        var saved = this.Saved;
        if (saved != null)
            saved(this, new DataObjectEventArgs<T>(dataObject));
    }

    // ************************
    // Batch save functionality
    // ************************

    Dictionary<BatchToken, List<T>> _BatchSavedDataObjects = new Dictionary<BatchToken, List<T>>();
    System.Threading.ReaderWriterLockSlim _BatchSavedDataObjectsLock = new System.Threading.ReaderWriterLockSlim();

    int _SavedObjectThreshold = 17; // if the number of objects being stored for a batch reaches this threshold, then those objects are to be cleared from the list.

    public BatchToken BeginSave()
    {
        // create a batch token to represent this batch
        BatchToken token = new BatchToken();

        _BatchSavedDataObjectsLock.EnterWriteLock();
        try
        {
            _BatchSavedDataObjects.Add(token, new List<T>());
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitWriteLock();
        }
        return token;
    }

    public void EndSave(BatchToken token)
    {
        List<T> batchSavedDataObjects;
        _BatchSavedDataObjectsLock.EnterWriteLock();
        try
        {
            if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");

            this.OnBatchSaved(batchSavedDataObjects); // this causes a single BatchSaved event to be fired

            if (!_BatchSavedDataObjects.Remove(token))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitWriteLock();
        }
    }

    public void Save(BatchToken token, T dataObject)
    {
        List<T> batchSavedDataObjects;
        // the read lock prevents EndSave from executing before this Save method has a chance to finish executing
        _BatchSavedDataObjectsLock.EnterReadLock();
        try
        {
            if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");

            // perform save logic

            this.OnBatchSaved(batchSavedDataObjects, dataObject);
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitReadLock();
        }
    }

    public event BatchDataObjectSaved<T> BatchSaved;

    protected void OnBatchSaved(List<T> batchSavedDataObjects)
    {
        lock (batchSavedDataObjects)
        {
            var batchSaved = this.BatchSaved;
            if (batchSaved != null)
                batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects));
        }
    }

    protected void OnBatchSaved(List<T> batchSavedDataObjects, T savedDataObject)
    {
        // add the data object to the list storing the data objects that have been saved for this batch
        lock (batchSavedDataObjects)
        {
            batchSavedDataObjects.Add(savedDataObject);

            // if the threshold has been reached
            if (_SavedObjectThreshold > 0 && batchSavedDataObjects.Count >= _SavedObjectThreshold)
            {
                // then raise the BatchSaved event with the data objects that we currently have
                var batchSaved = this.BatchSaved;
                if (batchSaved != null)
                    batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects.ToArray()));

                // and clear the list to ensure that we are not holding on to the data objects unnecessarily
                batchSavedDataObjects.Clear();
            }
        }
    }
}

class BatchToken
{
    static int _LastId = 0;
    static object _IdLock = new object();

    static int GetNextId()
    {
        lock (_IdLock)
        {
            return ++_LastId;
        }
    }

    public BatchToken()
    {
        this.Id = GetNextId();
    }

    public int Id { get; private set; }
}

class DataObjectEventArgs<T> : EventArgs
{
    public T DataObject { get; private set; }

    public DataObjectEventArgs(T dataObject)
    {
        this.DataObject = dataObject;
    }
}

delegate void DataObjectSaved<T>(object sender, DataObjectEventArgs<T> e);

class BatchDataObjectEventArgs<T> : EventArgs
{
    public IEnumerable<T> DataObjects { get; private set; }

    public BatchDataObjectEventArgs(IEnumerable<T> dataObjects)
    {
        this.DataObjects = dataObjects;
    }
}

delegate void BatchDataObjectSaved<T>(object sender, BatchDataObjectEventArgs<T> e);

. , .

separete events: Saved BatchSaved. , .

EDIT: , , .

EDIT: , ReaderWriterLockSlim, (.. ). , , , Save EndSave. , Endsave , . Save , , , .

, Save , EndSave. , , , , . , , ReaderWriterLockSlim Monitor, Save EndSave; . , Monitor - , .

EDIT:

.

    static void DataConcierge_Saved(object sender, DataObjectEventArgs<Program.Customer> e)
    {
        Console.WriteLine("DataConcierge<Customer>.Saved");
    }

    static void DataConcierge_BatchSaved(object sender, BatchDataObjectEventArgs<Program.Customer> e)
    {
        Console.WriteLine("DataConcierge<Customer>.BatchSaved: {0}", e.DataObjects.Count());
    }

    static void Main(string[] args)
    {
        DataConcierge<Customer> dc = new DataConcierge<Customer>();
        dc.Saved += new DataObjectSaved<Customer>(DataConcierge_Saved);
        dc.BatchSaved += new BatchDataObjectSaved<Customer>(DataConcierge_BatchSaved);

        var token = dc.BeginSave();
        try
        {
            for (int i = 0; i < 100; i++)
            {
                var c = new Customer();
                // ...
                dc.Save(token, c);
            }
        }
        finally
        {
            dc.EndSave(token);
        }
    }

:

DataConcierge < > .BatchSaved: 17

DataConcierge < > .BatchSaved: 17

DataConcierge < > .BatchSaved: 17

DataConcierge < > .BatchSaved: 17

DataConcierge < > .BatchSaved: 17

DataConcierge < > .BatchSaved: 15

17, 100 BatchSaved 6 .

+1

, , - , "". , . .

, - , , , - "", , . , ( 2 - ). , , , . , , - .

, ...

+1

, . , , , . , , ( ).

class ButtonClickBuffer
{
    public event EventHandler BufferedClick;

    public ButtonClickBuffer(Button button, int queueSize)
    {
        this.queueSize= queueSize;
        button.Click += this.button_Click;
    }

    private int queueSize;
    private List<EventArgs> queuedEvents = new List<EventArgs>();

    private void button_Click(object sender, EventArgs e)
    {
        queuedEvents.Add(e);
        if (queuedEvents.Count >= queueSize)
        {
            if (this.BufferedClick!= null)
            {
                foreach (var args in this.queuedEvents)
                {
                    this.BufferedClick(sender, args);
                }
                queuedEvents.Clear();
            }
        }
    }
}

, :

this.button1.Click += this.button1_Click;

, , :

ButtonClickBuffer buffer = new ButtonClickBuffer(this.button1, 5);
buffer.BufferedClick += this.button1_Click;

, , !

, , , , . , , . , - !

0
source

All Articles