Queue Operator for RxJS

Is there an operator in RxJS that would allow me to buffer elements and release them one by one whenever the signal observed is triggered? It looks like bufferWhen, but instead of flushing the entire buffer for each signal, it will issue a certain amount per signal. He could even reset the number that is emitted by the signal observed.

Input observable: >--a--b--c--d--| Signal observable: >------1---1-1-| Count in buffer: !--1--21-2-121-| Output observable: >------a---bc-| 
+5
source share
2 answers

Yes, you can use zip to do what you want:

 const input = Rx.Observable.from(["a", "b", "c", "d", "e"]); const signal = new Rx.Subject(); const output = Rx.Observable.zip(input, signal, (i, s) => i); output.subscribe(value => console.log(value)); signal.next(1); signal.next(1); signal.next(1); 
 .as-console-wrapper { max-height: 100% !important; top: 0; } 
 <script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script> 

In fact, zip used as an example in this GitHub issue , which relates to buffering.

If you want to use the value emitted by the signal to determine how many buffered values โ€‹โ€‹should be issued, you can do something like this:

 const input = Rx.Observable.from(["a", "b", "c", "d", "e"]); const signal = new Rx.Subject(); const output = Rx.Observable.zip( input, signal.concatMap(count => Rx.Observable.range(0, count)), (i, s) => i ); output.subscribe(value => console.log(value)); signal.next(1); signal.next(2); signal.next(1); 
 .as-console-wrapper { max-height: 100% !important; top: 0; } 
 <script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script> 
+6
source

window can be used to split the timeline. And takeLast used to store the output.

  let signal = Rx.Observable.interval(1000).take(4); let input = Rx.Observable.interval(300).take(10).share(); let output = input .do(value => console.log(`input = ${value}`)) .window(signal) .do(() => console.log(`*** signal : end OLD and start NEW subObservable`)) .mergeMap(subObservable => { return subObservable.takeLast(100); }) .share() output.subscribe(value => console.log(` output = ${value}`)); Rx.Observable.merge(input.mapTo(1), output.mapTo(-1)) .scan((count, diff) => { return count + diff; }, 0) .subscribe(count => console.log(` count = ${count}`)); 
 .as-console-wrapper { max-height: 100% !important; top: 0; } 
 <script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script> 

Result:

 22:28:37.971 *** signal : end OLD and start NEW subObservable 22:28:38.289 input = 0 22:28:38.292 count = 1 22:28:38.575 input = 1 22:28:38.576 count = 2 22:28:38.914 input = 2 22:28:38.915 count = 3 <signal received> 22:28:38.977 output = 0 22:28:38.979 count = 2 22:28:38.980 output = 1 22:28:38.982 count = 1 22:28:38.984 output = 2 22:28:38.986 count = 0 22:28:38.988 *** signal : end OLD and start NEW subObservable 22:28:39.175 input = 3 22:28:39.176 count = 1 22:28:39.475 input = 4 22:28:39.478 count = 2 22:28:39.779 input = 5 22:28:39.780 count = 3 <signal received> 22:28:39.984 output = 3 22:28:39.985 count = 2 22:28:39.986 output = 4 22:28:39.988 count = 1 22:28:39.989 output = 5 22:28:39.990 count = 0 22:28:39.992 *** signal : end OLD and start NEW subObservable 22:28:40.075 input = 6 22:28:40.077 count = 1 22:28:40.377 input = 7 22:28:40.378 count = 2 22:28:40.678 input = 8 22:28:40.680 count = 3 22:28:40.987 input = 9 22:28:40.990 count = 4 <input completed> 22:28:40.992 output = 6 22:28:40.993 count = 3 22:28:40.995 output = 7 22:28:40.996 count = 2 22:28:40.998 output = 8 22:28:40.999 count = 1 22:28:41.006 output = 9 22:28:41.007 count = 0 
+1
source

All Articles