Rx Framework: perform an action with a timeout without interrupting the original observable sequence

Given the observed source generated by polling (changing the state) of a low-level device ...

// observable source metacode: IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(tick => new DeviceState(_device.ReadValue())) .DistinctUntilChanged(); 

... and a consumer who updates the user interface ...

 // UI metacode: service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.ToString()); 

... I need to perform a custom action after x seconds of "inactivity" of the source without interrupting the subscription to the source. Something like that:

 // UI metacode: service.GetObservableDeviceStates() .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle") .Subscribe(state => viewModel.CurrentState = state.ToString()); 

What are the best practices? Possible solutions that come to mind (I'm Rx noob):

  • Buffer (even if it is not readable)
  • Playback this timeout overload ;
  • The return of some special "service side" when nothing changes (instead of using DistinctUntilChanged) and deals with it in the interface code:

    service.GetObservableDeviceStates () .Subscribe (state => viewModel.CurrentState = state.Special? "Idle": state.ToString ());

EDIT: as reported in the answer , the solution is:

  service.GetObservableDeviceStates() .Do(onNext) .Throttle(TimeSpan.FromSeconds(x)) .Subscribe(onTimeout); 

EDIT2 (warning)

If onNext and onTimeout update the user interface components to avoid CrossThreadExceptions, two ObserveOn (uiSynchronizationContext), since Throttle is running in a different thread!

  service.GetObservableDeviceStates() .ObserveOn(uiSynchronizationContext) .Do(onNext) .Throttle(TimeSpan.FromSeconds(x)) .ObserveOn(uiSynchronizationContext) .Subscribe(onTimeout); 
+7
source share
2 answers

The timeout is more or less intended for observables, which are single asynchronous operations - for example, to return the default value or OnError if the specified observable has not notified you for a certain period of time.

The operator you are looking for is Throttle , even if it may not seem the first. Throttle(p) provides a stream that generates a value when the original stream did not produce a value for period p .

In parallel with existing code, you can use source.Throttle(period).Do(...side effect) .

+7
source

I personally would avoid the Do method for this. This makes the code in this example pretty simple, but I find that as soon as using "Do" gets into the code base, you will soon get spaghetti.

You can also consider using combinations of Amb, Timer, TakeUntil, Throttle, etc. to get the result you are looking for and support Monad *. Or, simply, I assume that you ideally want the sequence of status values ​​to go through and not require the need to put a timer in your code (i.e., disable its loading).

 public IObservable<DeviceStatus> GetObservableDeviceStates(TimeSpan silencePeriod) { return Observable.Create<DeviceStatus>( o=> { var idle = Observable.Timer(silencePeriod).Select(_=>new DeviceStatus("Idle")); var polledStatus = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(tick => new DeviceStatus(_device.ReadValue())) .DistinctUntilChanged() .Publish(); var subscription = (from status in polledStatus from cont in Observable.Return(status).Concat(idle.TakeUntil(polledStatus)) select cont) .Subscribe(o); return new CompositeDisposable(subscription, polledStatus.Connect()); }); } 

This code now has a service that returns a wait status value after the specified silence period for a change has occurred.

This means that your user interface meta-code remains simple and the logic associated with DeviceStatus remains where it belongs.

 // UI metacode: service.GetObservableDeviceStates(TimeSpan.FromSeconds(2)) .Subscribe(state => viewModel.CurrentState = state.ToString()); 
+5
source

All Articles