, , , , , . , zipAndContinue, zip, , , . .
// .
function zipAndContinue() {
const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
const combined$ = Rx.Observable.combineLatest(observables);
const first$ = combined$.first();
const subsequent$ = combined$
.skip(1)
.bufferWithCount(arguments.length)
.flatMap(zipped)
.filter(xs => !xs.every(x => x === null));
return first$.concat(subsequent$)
}
:
function endWithNull(observable) {
return Rx.Observable.create(observer => {
return observable.subscribe({
onNext: x => observer.onNext(x),
onError: x => observer.onError(x),
onCompleted: () => {
observer.onNext(null);
observer.onCompleted();
}
})
})
}
function zipped(xs) {
const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);
const stillEmitting = Math.max.apply(null, nonNullCounts);
if (stillEmitting === 0) {
return Rx.Observable.empty();
}
return Rx.Observable.from(xs).skip(stillEmitting - 1);
}
:
const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);
zipAndContinue(one$, two$, three$)
.subscribe(x => console.log(x));
// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]
js- ( "", ): https://jsfiddle.net/ptx4g6wd/