Polling a web service using Reactive Extensions and linking the latest results x

I am trying to use Reactive Extensions (Rx) for a task in which it seems appropriate, polling at a specific interval of a web service and displaying its latest x results.

I have a web service that sends me the status of the tool that I want to control. I would like to poll this tool at a certain speed and display in the list the last 20 statuses that were polled.

So, my list will look like a “moving window” of the result of the service.

I am developing a WPF application using Caliburn.Micro, but I do not think it is very important.

So far I have managed to get the following (just an example application that I quickly cracked, I am not going to do this in ShellViewModel in a real application):

public class ShellViewModel : Caliburn.Micro.PropertyChangedBase, IShell { private ObservableCollection<string> times; private string currentTime; public ShellViewModel() { times = new ObservableCollection<string>(); Observable .Interval(TimeSpan.FromSeconds(1)) .SelectMany(x => this.GetCurrentDate().ToObservable()) .ObserveOnDispatcher() .Subscribe(x => { this.CurrentTime = x; this.times.Add(x); }); } public IEnumerable<string> Times { get { return this.times; } } public string CurrentTime { get { return this.currentTime; } set { this.currentTime = value; this.NotifyOfPropertyChange(() => this.CurrentTime); } } private async Task<string> GetCurrentDate() { var client = new RestClient("http://www.timeapi.org"); var request = new RestRequest("/utc/now.json"); var response = await client.ExecuteGetTaskAsync(request); return response.Content; } } 

In the view, I only have a label associated with the CurrentTime property and a list associated with the Times property.

I have a question:

  • It is not limited to 20 items in the list, as I always add items to the ObservableCollection , but I cannot find a better way to bind data
  • The interval does not work as we would like. If it takes more than 1 second to run the request, two requests will be executed in parallel, which I would not want. My goal would be for the request to be repeated endlessly, but in steps of no more than 1 request every second. If the request ends for more than 1 second, it should wait for the completion and direct start of a new request.

Second edit:

The previous edit below was that I was stupid and very confused, it triggers events continuously, because the Interval is something continuous, which never ends. Brandon's solution is correct and works as expected.

Edit:

Based on Brandon's example, I tried to execute the following code in LinqPad:

 Observable .Merge(Observable.Interval(TimeSpan.FromSeconds(2)), Observable.Interval(TimeSpan.FromSeconds(10))) .Repeat() .Scan(new List<double>(), (list, item) => { list.Add(item); return list; }) .Subscribe(x => Console.Out.WriteLine(x)) 

And I see that recording to the console occurs every 2 seconds, and not every 10. Thus, the replay did not wait for the completion of both observations before the replay.

+7
c # system.reactive
source share
2 answers

Try the following:

 // timer that completes after 1 second var intervalTimer = Observable .Empty<string>() .Delay(TimeSpan.FromSeconds(1)); // queries one time whenever subscribed var query = Observable.FromAsync(GetCurrentDate); // query + interval timer which completes // only after both the query and the timer // have expired var intervalQuery = Observable.Merge(query, intervalTimer); // Re-issue the query whenever intervalQuery completes var queryLoop = intervalQuery.Repeat(); // Keep the 20 most recent results // Note. Use an immutable list for this // https://www.nuget.org/packages/microsoft.bcl.immutable // otherwise you will have problems with // the list changing while an observer // is still observing it. var recentResults = queryLoop.Scan( ImmutableList.Create<string>(), // starts off empty (acc, item) => { acc = acc.Add(item); if (acc.Count > 20) { acc = acc.RemoveAt(0); } return acc; }); // store the results recentResults .ObserveOnDispatcher() .Subscribe(items => { this.CurrentTime = items[0]; this.RecentItems = items; }); 
+6
source share

This should skip messages at intervals while GetCurrentDate is running.

 Observable .Interval(TimeSpan.FromSeconds(1)) .GroupByUntil(p => 1,p => GetCurrentDate().ToObservable().Do(x => { this.CurrentTime = x; this.times.Add(x); })) .SelectMany(p => p.LastAsync()) .Subscribe(); 
0
source share

All Articles