Why does the Rx buffer continuously execute the method when the buffer contains no elements?

I have an Rx Observable that acts like a buffer. Right now he is executing the method in Subscription when he receives 10 items or after 100 milliseconds, whichever comes first.

I noticed that my method is constantly being called every 100 ms, even if there are no elements in the buffer, which surprised me. It is simple enough to just return my method immediately if it does not receive any elements from the buffer, but I thought it was strange that it just pushed in the background.

Why is this? What is your best recommendation for me to handle this? I'm a complete Rx newbie, so maybe I'm doing something weird. Here is a simplified version of my code:

private Subject<KeyValuePair<int, Action<MyData>>> serverRequests; public MyBufferClass(IMyServer server, IScheduler scheduler) { this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>(); this.serverRequests .Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler) .Subscribe(buffer => GetMultipleItemsFromServer(buffer)); } public void GetSingleItemFromServer(int id, Action<MyData> callback) { this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback)); } public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks) { if (idsWithCallbacks.IsNullOrEmpty()) return; this.server.GetMultipleItems(idsWithCallbacks) } 

In my tests, if I call GetSingleItemFromServer 5 times and then advance my test selection for 1000 ms, I thought that GetMultipleItemsFromServer will be called only once, but it will be called 10 times.

+5
c # system.reactive
source share
2 answers

In such situations, an elegant solution would be to use the Where statement right after the buffer to filter out any empty results. Something like that:

  stream .Buffer (...) .Where (x => x.Any()) .Subscribe (x => {...}, ex => {...}); 

As to why Buffer acts this way, I think it's better to place an empty collection and let the consumer choose what to do with it than to internalize it and deny this opportunity.

In a separate note, I would not have had your server call inside the subscription block. I think it’s best to have any asynchronous operations as part of the composition of the Rx stream itself and limit the β€œSubscribe” action to any light operations that relate to the end result, that is, updating the user interface, successful execution / failure of logging, etc. . Something like:

 (from request in serverRequests .Buffer (TimeSpan.FromMinutes (1)) .Where (x => x.Any()) from response in Observable.Start(server.GetMultipleItems(...)) select response) .Subscribe (x => {}, ex => {}); 

Benefits of this include:

-You can use further Rx-operators when calling the server, such as Timeout (), Retry (), Catch (), etc.

- Ability to handle any pipeline errors in overload Subscribe ()

Independent pipeline scheduling and Subscription action using SubscribeOn () / ObserveOn ().

+4
source share

Perhaps try this as follows:

 public MyBufferClass(IMyServer server, IScheduler scheduler) { this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>(); this.serverRequests .GroupByUntil(x => 1, x => Observable.Timer(TimeSpan.FromMilliseconds(1000))) .SelectMany(x => x.ToArray()) .Subscribe(buffer => GetMultipleItemsFromServer(buffer)); } 

This does not give you empty results.

And the answer to your question regarding .Buffer(...) is how it was developed. Nothing more complicated.

+6
source share

All Articles