I have a live data stream and a stream that basically limits the parts of the live data that belong to each other. Now that someone is joining a live data stream, I would like to reproduce their live data. However, I do not want to recall all the live data, only a part since the last time another thread emitted a value.
There is a problem that will solve my problem as there is a replay operator that does exactly what I want (or at least I think).
Is it easy to do now? Is there a better way than something like the following?
private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem>
{
private readonly List<TItem> cached = new List<TItem>();
private readonly IObservable<TDelimiter> delimitersObservable;
private readonly IObservable<TItem> itemsObservable;
public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable)
{
this.itemsObservable = itemsObservable;
this.delimitersObservable = delimitersObservable;
}
public IDisposable Subscribe(IObserver<TItem> observer)
{
lock (cached)
{
cached.ForEach(observer.OnNext);
}
return itemsObservable.Subscribe(observer);
}
public IDisposable Connect()
{
var delimiters = delimitersObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Clear();
}
});
var items = itemsObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Add(p);
}
});
return Disposable.Create(
() =>
{
items.Dispose();
delimiters.Dispose();
lock (cached)
{
cached.Clear();
}
});
}
public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}
source
share