-. , @DaveSexton . BufferIntrospective Rxx. .
, , , IObserver<T> - OnXXX. Observer, concurrency .
, , , , .
- OnNext , . BufferIntrospective, , .
, , , . Rxx , , , . concurrency .
public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
this IObservable<TSource> source,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<IList<TSource>>(o => {
Subject<Unit> feedback = new Subject<Unit>();
var sourcePub = source.Publish().RefCount();
var sub = sourcePub.Buffer(
() => feedback).ObserveOn(scheduler).Subscribe(@event =>
{
o.OnNext(@event);
feedback.OnNext(Unit.Default);
},
o.OnError,
o.OnCompleted);
var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
return new CompositeDisposable(sub, start);
});
}
, , 5, - 10.
LINQPad Dump, .
var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);
var buffered = xs.BufferIntrospective();
buffered.Subscribe(x => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});
buffered.Subscribe(x => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});