Throttle IObservable based on value

I have one IObservable<String>.

I am trying to detect (and handle) a case where the same line is notified by a short sequence.

I want the filter / stream / observable such that if the same line is notified within 250 ms of each other, it is notified only once.

Not sure where to start.

+4
source share
2 answers

Here is a fairly compact solution. Your post is a bit ambiguous about whether the duration will be reset immediately after receiving a certain value or not - that's why I presented two solutions to both interpretations.

Variation 1 - The difference between the values ​​does not reset the timer

, - "" ( ) - , "a", "b" ,"a", "a". , GroupByUntil, :

    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.GroupByUntil(k => k,
                                   _ => Observable.Timer(duration, scheduler))
                     .SelectMany(y => y.FirstAsync());
    }

- Variation 2b; , , . , , SuppressDuplicatesWithinWindow ...

2a - "" DO reset

- . Publish(). RefCount(), :

public static IObservable<T> DistinctUntilChanged<T>(
    this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
    if (scheduler == null) scheduler = Scheduler.Default;

    var sourcePub = source.Publish().RefCount();

    return sourcePub.GroupByUntil(
        k => k,
        x => Observable.Timer(duration, scheduler)
                       .TakeUntil(
                           sourcePub.Where(i => ReferenceEquals(null, i)
                                                ? !ReferenceEquals(null, x.Key)
                                                : !i.Equals(x.Key))))
        .SelectMany(y => y.FirstAsync());
}

2b

, , , , 2a :

Observable.DistinctUntilChanged, . . , , reset.

, DistinctUntilChanged, IEqualityComparer. TimeStamp , , .

public static partial class ObservableExtensions
{
    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.Timestamp(scheduler)
                     .DistinctUntilChanged(new Comparer<T>(duration))
                     .Select(ts => ts.Value);
    }

    private class Comparer<T> : IEqualityComparer<Timestamped<T>>
    {
        private readonly TimeSpan _duration;

        public Comparer(TimeSpan duration)
        {
            _duration = duration;
        }

        public bool Equals(Timestamped<T> x, Timestamped<T> y)
        {
            if (y.Timestamp - x.Timestamp > _duration) return false;

            return ReferenceEquals(x.Value, y.Value)
                   && !ReferenceEquals(null,x.Value)
                   && x.Value.Equals(y.Value);
        }

        public int GetHashCode(Timestamped<T> obj)
        {
            if (ReferenceEquals(null,obj.Value)) return obj.Timestamp.GetHashCode();
            return obj.Value.GetHashCode() ^ obj.Timestamp.GetHashCode();
        }
    }
}

, ( nuget rx-testing nunit):

public class TestDistinct : ReactiveTest
{
    [Test]
    public void DuplicateWithinDurationIsSupressed()
    {
        var scheduler = new TestScheduler();
        var source =scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void NonDuplicationWithinDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100,"a"),
            OnNext(200,"b"));
    }

    [Test]
    public void DuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "a"));
    }

    [Test]
    public void NonDuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "b"));
    }

    [Test]
    public void TestWithSeveralValues()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));
    }

    [Test]
    public void CanHandleNulls()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null),
            OnNext(700, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null));
    }

    [Test]
    public void TwoDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(150, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void TwoNullDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, (string)null),
            OnNext(150, (string)null),
            OnNext(200, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, (string)null));
    }
}

, - 1 TestWithSeveralValues:

    [Test]
    public void TestWithSeveralValuesVariation1()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(900, "a"));
    }

:

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(700, (string)null)); /* This line changes */
+8

Observable.Throttle

, .

Edit

, , , OP. , , , , ? (F # split, , , -, #.)

, split:

public static class Extension
{
    public static IDisposable SplitSubscribe<T, TKey>(
        this IObservable<T> source, 
        Func<T, TKey> keySelector, 
        Action<IObservable<TKey>> subscribe)
    {
        // maintain a list of Observables, one for each key (TKey)
        var observables = new ConcurrentDictionary<TKey, Subject<TKey>>();

        // function to create a new Subject
        Func<TKey, Subject<TKey>> createSubject = key =>
        {
            Console.WriteLine("Added for " + key);
            var retval = new Subject<TKey>();
            subscribe(retval);
            retval.OnNext(key);
            return retval;
        };

        // function to update an existing Subject
        Func<TKey, Subject<TKey>, Subject<TKey>> updateSubject = (key, existing) =>
        {
            Console.WriteLine("Updated for " + key);
            existing.OnNext(key);
            return existing;
        };

        return source.Subscribe(next =>
        {
            var key = keySelector(next);
            observables.AddOrUpdate(key, createSubject, updateSubject);
        });
        // TODO dispose of all subscribers
    }

    // special case: key selector is just the item pass-through
    public static IDisposable SplitSubscribe<T>(
        this IObservable<T> source, 
        Action<IObservable<T>> subscribe)
    {
        return source.SplitSubscribe(item => item, subscribe);
    }
}

. :

IObservable<string> dummyObservable = new string[] { "a", "b", "a", "b", "b", "c", "a" }.ToObservable();

dummyObservable.SplitSubscribe(next => 
    next.Throttle(TimeSpan.FromMilliseconds(250)).Subscribe(Console.WriteLine));

( )

Added for a
Added for b
Updated for a
Updated for b
Updated for b
Added for c
Updated for a
a
c
b
0

All Articles