RXJS: aggregate rejection

My use case is as follows: I get events that sometimes happen in packages. If a splash occurs, I only need to process it once. Debounce does this.

However, debounce only gives me the last element of the package, but I need to know about all the elements in the package to aggregate them (using a flat map).

This can be done with a temporary window or a buffer, however, these are fixed intervals, so the buffer / window timeout can occur in the middle of the packet, therefore splitting the packet into 2 parts for processing instead of 1.

So what I would like is something like

. . event: a . . -> a . . . . . .event: b .event: c .event: d . .-> b,c,d . . . . .event : e . . -> e . 
+8
rxjs reactivex
source share
2 answers

This can be achieved with buffer , passing the released stream as a close selector, for example:

 var s = Rx.Observable.of('a') .merge(Rx.Observable.of('b').delay(100)) .merge(Rx.Observable.of('c').delay(150)) .merge(Rx.Observable.of('d').delay(200)) .merge(Rx.Observable.of('e').delay(300)) .share() ; s.buffer(s.debounce(75)).subscribe(x => console.log(x)); 

Here's the runnable version: https://jsbin.com/wusasi/edit?js,console,output

+11
source share

Perhaps you are looking for bufferWithTimeOrCount

from the page:

 /* Hitting the count buffer first */ var source = Rx.Observable.interval(100) .bufferWithTimeOrCount(500, 3) .take(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x.toString()); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); // => Next: 0,1,2 // => Next: 3,4,5 // => Next: 6,7,8 // => Completed 
-one
source share

All Articles