Strilanc, given your concern about unwanted activity when the source stream is inactive, you might be interested in this event method for promotion - I was not going to add it otherwise, because, in my opinion, the implementation of J. Lennon is quite reasonable (and much simpler) , and timer performance will not hurt.
- Sample, , , . .
EDIT. v3 , , - , , , .
public static IObservable<T> LimitRate<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
return source.DistinctUntilChanged()
.GroupByUntil(k => 0,
g => Observable.Timer(duration, scheduler))
.SelectMany(x => x.FirstAsync()
.Merge(x.Skip(1)
.TakeLast(1)))
.Select(x => Observable.Return(x)
.Concat(Observable.Empty<T>()
.Delay(duration, scheduler)))
.Concat();
}
, GroupByUntil, . ( ) .
, OnCompleted . . , , .
( v3), nuget rx-testing nunit:
public class LimitRateTests : ReactiveTest
{
[Test]
public void SlowerThanRateIsUnchanged()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
}
[Test]
public void FasterThanRateIsSampled()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(140, 5),
OnNext(150, 2),
OnNext(300, 3),
OnNext(350, 4));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
}
[Test]
public void DuplicatesAreOmitted()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(300, 1),
OnNext(350, 1));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1));
}
[Test]
public void CoolResetsCorrectly()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 2),
OnNext(205, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
}
[Test]
public void MixedPacingWorks()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(825, 5));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(850, 5));
}
}