Change spacing of RX statements?

This might be a dumb question, as I'm a little new to RX :)

I take an event sample (RX for .Net 4.0):

eventAsObservable.Sample (TimeSpan.FromSeconds (1)). Timestamp (). Subscribe (x => Console.WriteLine ("testing:" + x.Value.EventArgs.str));

The problem is that the sampling time should be changed on the fly, I think I could create some property that will delete the existing handler and create a new one when it changes, but it seems a little dirty and more vulnerable to timelines. Is there a way to just change the interval?

Example: say someone writes a character string when a certain sequence is found, you want to change the sampling time without a missed event, and it is preferable not to receive the event more than once

+6
system.reactive
source share
4 answers

I don’t know how to change the existing sampling interval, but what you could do is select at the highest frequency that you need, and then a filter with the Where clause, which uses a variable that you can change.

For example:

 static IObservable<T> SampleEvery<T>(this IObservable<T> source, Func<int> multipleProvider) { int counter = 0; Func<T, bool> predicate = ignored => { counter++; if (counter >= multipleProvider()) { counter = 0; } return counter == 0; }; return source.Where(predicate); } 

You would call it this way:

 // Keep this somewhere you can change it int multiple = 1; eventAsObservable.Sample(TimeSpan.FromSeconds(1)) .SampleEvery(() => multiple) .Timestamp() .Subscribe(x => Console.WriteLine("testing:" + x.Value.EventArgs.str)); 

Changing the multiple value will now change the effective sampling rate.

This is a pretty ugly hack, but I think it should work.

+7
source share

I know that this question has already been answered, but I thought that I would add a few more ways to solve this problem using Rx.

You can use Switch in the TimeSpan sequence:

 private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>(); sampleFrequencies .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp()) .Switch() .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str)); // To change: // sampleFrequencies.OnNext(TimeSpan.FromSeconds(5)); 

Alternatively, it could also be solved with Defer , TakeUntil and Repeat (this one is a little crazy and included as an exercise):

 private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2); private Subject<Unit> frequencyChanged = new Subject<Unit>(); (Observable .Defer(() => eventAsObservable .Sample(Observable.Interval(sampleFrequency) ) .Timestamp() .TakeUntil(frequencyChanged) ).Repeat() .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str)); // To change: // sampleFrequency = TimeSpan.FromSeconds(5); // frequencyChanged.OnNext(new Unit()); 
+5
source share

TL; DR: Create an Observable using an ObservableFromIntervalFunctor, as shown below:

 void Main() { // Pick an initial period, it can be changed later. var intervalPeriod = TimeSpan.FromSeconds(1); // Create an observable using a functor that captures the interval period. var o = ObservableFromIntervalFunctor(() => intervalPeriod); // Log every value so we can visualize the observable. o.Subscribe(Console.WriteLine); // Sleep for a while so you can observe the observable. Thread.Sleep(TimeSpan.FromSeconds(5.0)); // Changing the interval period will takes effect on next tick. intervalPeriod = TimeSpan.FromSeconds(0.3); } IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor) { return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor()); } 

Explanation: Observable.Generate has an overload that allows you to specify when the next value will be generated using a functor. By passing a functor that captures a time interval variable, you can change the observed period period by changing the captured time variable.

Linqpad snippet here

+2
source share

Why don't you just sign up twice?

 Observable.Merge( eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)), eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)), ).Subscribe(Console.WriteLine); 

Or if the search is only active based on some kind of prefix or classifier, such as Google Chrome '?' Operator:

 Observable.Merge( eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)), eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)), ).Subscribe(Console.WriteLine); 
0
source share

All Articles