Move Rx items with a cooldown, switching to fetch when they come too fast

I am looking for an Rx method that will take the observable and put the last element in the “cooldown”, so that when items go slower than the cooldown, they just redirect, but when they come faster you get the most recent value after each recovery period.

Speaking differently, I want to switch to a selection with a period twhen the elements are divided by less than t(and come back when they are expanded).

It really looks like Observable.Throttle , except that the timer does not reset when a new item arrives.

The application that I mean is intended to send "last value" updates over the network. I do not want to report the value if it has not changed, and I do not want spam to quickly change the value so that I pushed other data.

Is there a standard method that does what I need?

+4
source share
4 answers

Strilanc, given your concern about unwanted activity when the source stream is inactive, you might be interested in this event method for promotion - I was not going to add it otherwise, because, in my opinion, the implementation of J. Lennon is quite reasonable (and much simpler) , and timer performance will not hurt.

- Sample, , , . .

EDIT. v3 , , - , , , .

    public static IObservable<T> LimitRate<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        return source.DistinctUntilChanged()
                     .GroupByUntil(k => 0,
                                   g => Observable.Timer(duration, scheduler))
            .SelectMany(x => x.FirstAsync()
                              .Merge(x.Skip(1)
                                      .TakeLast(1)))
                              .Select(x => Observable.Return(x)
                                .Concat(Observable.Empty<T>()
                                    .Delay(duration, scheduler)))
                                    .Concat();
    }

, GroupByUntil, . ( ) .

, OnCompleted . . , , .

( v3), nuget rx-testing nunit:

public class LimitRateTests : ReactiveTest
{
    [Test]
    public void SlowerThanRateIsUnchanged()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(700, 3));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(700, 3));
    }

    [Test]
    public void FasterThanRateIsSampled()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(140, 5),
            OnNext(150, 2),
            OnNext(300, 3),
            OnNext(350, 4));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(400, 4));
    }

    [Test]
    public void DuplicatesAreOmitted()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 1),
            OnNext(300, 1),
            OnNext(350, 1));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1));
    }

    [Test]
    public void CoolResetsCorrectly()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 2),
            OnNext(205, 3));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));
    }

    [Test]
    public void MixedPacingWorks()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 1),
            OnNext(450, 3),
            OnNext(750, 4),
            OnNext(825, 5));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(450, 3),
            OnNext(750, 4),
            OnNext(850, 5));
    }
}
+4

Observable.DistinctUntilChanged Observable.Sample.

Observable.DistinctUntilChanged

, . (http://www.introtorx.com/content/v1.0.10621.0/05_Filtering.html)

Observable.Sample

Sample TimeSpan. (http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample)

, , .

+2

, , , , , . 2 .

SampleImmediate, , Sample, , . . Materialize/Dematerialize DistinctUntilChanged , . Merge, Take(1) Sample " ". Publish Connect . GroupBy SelectMany, , . Create .

public static IObservable<T> SampleImmediate<T>(this IObservable<T> source, TimeSpan dueTime)
{
    return source
        .GroupBy(x => 0)
        .SelectMany(group =>
        {
            return Observable.Create<T>(o =>
            {
                var connectable = group.Materialize().Publish();

                var sub = Observable.Merge(
                        connectable.Sample(dueTime),
                        connectable.Take(1)
                    )
                    .DistinctUntilChanged()
                    .Dematerialize()
                    .Subscribe(o);

                return new CompositeDisposable(connectable.Connect(), sub);
            });
        });
}

SampleImmediate Cooldown, GroupByUntil, , , Throttle . , SampleImmediate .

public static IObservable<T> Cooldown<T>(this IObservable<T> source, TimeSpan dueTime)
{
    return source
        .GroupByUntil(x => 0, group => group.Throttle(dueTime))
        .SelectMany(group => group.SampleImmediate(dueTime));
}

, , , .

+2

.

Rx, - (ReactiveCocoa). Rx, .

, , /:

-(RACSignal*)cooldown:(NSTimeInterval)cooldownPeriod onScheduler:(RACScheduler *)scheduler {
    need(cooldownPeriod >= 0);
    need(!isnan(cooldownPeriod));
    need(scheduler != nil);
    need(scheduler != RACScheduler.immediateScheduler);

    force(cooldownPeriod != 0); //todo: bother with no-cooldown case?
    force(!isinf(cooldownPeriod)); //todo: bother with infinite case?

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        need(subscriber != nil);

        NSObject* lock = [NSObject new];
        __block bool isCoolingDown = false;
        __block bool hasDelayedValue = false;
        __block id delayedValue = nil;
        __block RACDisposable *cooldownDisposer = nil;
        void (^onCanSendValue)(void) = ^{
            @synchronized (lock) {
                // check that we were actually cooling down
                // (e.g. what if the system thrashed before we could dispose the running-down timer, causing a redundant call?)
                if (!isCoolingDown) {
                    return;
                }

                // if no values arrived during the cooldown, we do nothing and can stop the timer for now
                if (!hasDelayedValue) {
                    isCoolingDown = false;
                    [cooldownDisposer dispose];
                    return;
                }

                // forward latest value
                id valueToSend = delayedValue;
                hasDelayedValue = false;
                delayedValue = nil;
                // todo: can this be avoided?
                // holding a lock while triggering arbitrary actions cam introduce subtle deadlock cases...
                [subscriber sendNext:valueToSend];
            }
        };
        void (^preemptivelyEndCooldown)(void) = ^{
            // forward latest value AND ALSO force cooldown to run out (disposing timer)
            onCanSendValue();
            onCanSendValue();
        };

        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            bool didStartCooldown;
            @synchronized (lock) {
                hasDelayedValue = true;
                delayedValue = x;
                didStartCooldown = !isCoolingDown;
                isCoolingDown = true;
            }

            if (didStartCooldown) {
                // first item gets sent right away
                onCanSendValue();
                // coming items have to wait for the timer to run down
                cooldownDisposer = [[RACSignal interval:cooldownPeriod onScheduler:scheduler]
                                    subscribeNext:^(id _) { onCanSendValue(); }];
            }
        } error:^(NSError *error) {
            preemptivelyEndCooldown();
            [subscriber sendError:error];
        } completed:^{
            preemptivelyEndCooldown();
            [subscriber sendCompleted];
        }];

        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            @synchronized (lock) {
                isCoolingDown = false;
                [cooldownDisposer dispose];
            }
        }];
    }] setNameWithFormat:@"[%@ cooldown:%@]", self.name, @(cooldownPeriod)];
}

It should translate almost directly to .Net RX. He will stop doing any work when items stop arriving, and they will send items as quickly as possible while observing the cooldown.

0
source