Here is a fairly compact solution. Your post is a bit ambiguous about whether the duration will be reset immediately after receiving a certain value or not - that's why I presented two solutions to both interpretations.
Variation 1 - The difference between the values does not reset the timer
, - "" ( ) - , "a", "b" ,"a", "a". , GroupByUntil, :
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
return source.GroupByUntil(k => k,
_ => Observable.Timer(duration, scheduler))
.SelectMany(y => y.FirstAsync());
}
- Variation 2b; , , . , , SuppressDuplicatesWithinWindow ...
2a - "" DO reset
- . Publish(). RefCount(), :
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
var sourcePub = source.Publish().RefCount();
return sourcePub.GroupByUntil(
k => k,
x => Observable.Timer(duration, scheduler)
.TakeUntil(
sourcePub.Where(i => ReferenceEquals(null, i)
? !ReferenceEquals(null, x.Key)
: !i.Equals(x.Key))))
.SelectMany(y => y.FirstAsync());
}
2b
, , , , 2a :
Observable.DistinctUntilChanged, . . , , reset.
, DistinctUntilChanged, IEqualityComparer. TimeStamp , , .
public static partial class ObservableExtensions
{
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
return source.Timestamp(scheduler)
.DistinctUntilChanged(new Comparer<T>(duration))
.Select(ts => ts.Value);
}
private class Comparer<T> : IEqualityComparer<Timestamped<T>>
{
private readonly TimeSpan _duration;
public Comparer(TimeSpan duration)
{
_duration = duration;
}
public bool Equals(Timestamped<T> x, Timestamped<T> y)
{
if (y.Timestamp - x.Timestamp > _duration) return false;
return ReferenceEquals(x.Value, y.Value)
&& !ReferenceEquals(null,x.Value)
&& x.Value.Equals(y.Value);
}
public int GetHashCode(Timestamped<T> obj)
{
if (ReferenceEquals(null,obj.Value)) return obj.Timestamp.GetHashCode();
return obj.Value.GetHashCode() ^ obj.Timestamp.GetHashCode();
}
}
}
, ( nuget rx-testing nunit):
public class TestDistinct : ReactiveTest
{
[Test]
public void DuplicateWithinDurationIsSupressed()
{
var scheduler = new TestScheduler();
var source =scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"));
}
[Test]
public void NonDuplicationWithinDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "b"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100,"a"),
OnNext(200,"b"));
}
[Test]
public void DuplicateAfterDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, "a"));
}
[Test]
public void NonDuplicateAfterDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, "b"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, "b"));
}
[Test]
public void TestWithSeveralValues()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
}
[Test]
public void CanHandleNulls()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(600, (string)null),
OnNext(700, (string)null));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(600, (string)null));
}
[Test]
public void TwoDuplicatesWithinDurationAreSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(150, "a"),
OnNext(200, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"));
}
[Test]
public void TwoNullDuplicatesWithinDurationAreSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, (string)null),
OnNext(150, (string)null),
OnNext(200, (string)null));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, (string)null));
}
}
, - 1 TestWithSeveralValues:
[Test]
public void TestWithSeveralValuesVariation1()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(900, "a"));
}
:
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(700, (string)null));