Given the observed source generated by polling (changing the state) of a low-level device ...
// observable source metacode: IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(tick => new DeviceState(_device.ReadValue())) .DistinctUntilChanged();
... and a consumer who updates the user interface ...
... I need to perform a custom action after x seconds of "inactivity" of the source without interrupting the subscription to the source. Something like that:
// UI metacode: service.GetObservableDeviceStates() .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle") .Subscribe(state => viewModel.CurrentState = state.ToString());
What are the best practices? Possible solutions that come to mind (I'm Rx noob):
- Buffer (even if it is not readable)
- Playback this timeout overload ;
The return of some special "service side" when nothing changes (instead of using DistinctUntilChanged) and deals with it in the interface code:
service.GetObservableDeviceStates () .Subscribe (state => viewModel.CurrentState = state.Special? "Idle": state.ToString ());
EDIT: as reported in the answer , the solution is:
service.GetObservableDeviceStates() .Do(onNext) .Throttle(TimeSpan.FromSeconds(x)) .Subscribe(onTimeout);
EDIT2 (warning)
If onNext and onTimeout update the user interface components to avoid CrossThreadExceptions, two ObserveOn (uiSynchronizationContext), since Throttle is running in a different thread!
service.GetObservableDeviceStates() .ObserveOn(uiSynchronizationContext) .Do(onNext) .Throttle(TimeSpan.FromSeconds(x)) .ObserveOn(uiSynchronizationContext) .Subscribe(onTimeout);
Notoriousxl
source share