bufferClosingSelector is a function that is called every time to get an Observable, which will return a value when it is expected that the buffer will be closed.
For example,
source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) works like a regular Buffer(time) overload.
If you want to weigh the sequence, you can apply Scan by sequence, and then decide on your aggregated state.
For example, source.Scan((a,c) => a + c).SkipWhile(a => a < 100) gives you a sequence that generates a value when the original sequence added more than 100.
You can use Amb to set up these two closure conditions to see who responds first:
.Buffer(() => Observable.Amb ( Observable.Timer(TimeSpan.FromSeconds(1)), source.Scan((a,c) => a + c).SkipWhile(a => a < 100) ) )
You can use any combination of combinators that produces any value so that the buffer is closed at this point.
Note: The value set to the close selector does not matter - this is important for notification. Therefore, to combine different types of sources with Amb simply change it to System.Reactive.Unit .
Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
Asti
source share