Is Observable.Buffer possible on anything other than time

I searched for examples on how to use Observable.Buffer in rx, but cannot find anything more substantial than buffered material with a clock plate.

There seems to be an overload for specifying "bufferClosingSelector", but I can't think it over.

What I'm trying to do is create a sequence that is buffered by time or "accumulation". Consider a request stream in which each request has some weight, and I don’t want to process more than x accumulated weight at a time, or if not enough accumulated, just give me what appeared in the last timeframe (regular buffer function)

+7
source share
1 answer

bufferClosingSelector is a function that is called every time to get an Observable, which will return a value when it is expected that the buffer will be closed.

For example,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) works like a regular Buffer(time) overload.

If you want to weigh the sequence, you can apply Scan by sequence, and then decide on your aggregated state.

For example, source.Scan((a,c) => a + c).SkipWhile(a => a < 100) gives you a sequence that generates a value when the original sequence added more than 100.

You can use Amb to set up these two closure conditions to see who responds first:

  .Buffer(() => Observable.Amb ( Observable.Timer(TimeSpan.FromSeconds(1)), source.Scan((a,c) => a + c).SkipWhile(a => a < 100) ) ) 

You can use any combination of combinators that produces any value so that the buffer is closed at this point.

Note: The value set to the close selector does not matter - this is important for notification. Therefore, to combine different types of sources with Amb simply change it to System.Reactive.Unit .

 Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit()) 
+13
source

All Articles