Buffer for processing elements

I have an event that fires regularly. Suppose that processing an event takes ~ 1 s. Instead of waiting for 1s for each received event, I want to accumulate events until the last processing is completed. When the processing is completed, I want to process the event data that I received during the last processing:

e1   e2   e3                                                            e4   e5   e6                 e7                                              events happening   
---------------------------------------------------------------------------------------------------------------------------------------------------> time
                         1s                      2s                     3s                       4s                       5s                      6s
p(e1)                    p(e2, e3)                                      p(e4)                    p(e5, e6)                p(e7)
[-----------------------][-----------------------]                      [-----------------------][-----------------------][-----------------------]  processing of items                        

In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 - 
is finished the processing of the events e2 and e3 takes place. 

This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been 
checked in during the build will then be processed.

How do I do this with Rx?

I tried using Buffer in conjunction with the open and close selector, but I cannot figure it out correctly. Any examples or direction appreciated!

Assume that the input stream is used Subject<int>.

I tried something similar, but I completely lost.

var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
    bc.OnNext(true);
    String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
    Thread.Sleep(300);
    bc.OnNext(false);
});
+4
1

-. , @DaveSexton . BufferIntrospective Rxx. .

, , , IObserver<T> - OnXXX. Observer, concurrency .

, , , , .

- OnNext , . BufferIntrospective, , .

, , , . Rxx , , , . concurrency .

public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return Observable.Create<IList<TSource>>(o => {
        Subject<Unit> feedback = new Subject<Unit>();
        var sourcePub = source.Publish().RefCount();
        var sub = sourcePub.Buffer(
            () => feedback).ObserveOn(scheduler).Subscribe(@event =>
            {                
                o.OnNext(@event);
                feedback.OnNext(Unit.Default);
            },
            o.OnError,
            o.OnCompleted);
        var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
        return new CompositeDisposable(sub, start);
    });        
}

, , 5, - 10.

LINQPad Dump, .

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);

var buffered = xs.BufferIntrospective();

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});
+6

All Articles