RXJS: merge stream elements one at a time

I would like to combine elements of several threads in turn:

var print = console.log.bind(console);

var s1 = Rx.Observable.fromArray([1, 1, 5]);
var s2 = Rx.Observable.fromArray([2, 9]);
var s3 = Rx.Observable.fromArray([3, 4, 6, 7, 8]);

alternate(s1, s2, s3).subscribe(print); // 1, 2, 3, 1, 9, 4, 5, 6, 7, 8

What does a function definition look like alternate?

+4
source share
2 answers

Use zip and concatMap when working on observables that were created from arrays (as in your example), or zip and flatMap when working on observables, which are essentially asynchronous.

Rx.Observable
  .zip(s1, s2, s3, function(x,y,z) { return [x,y,z]; })
  .concatMap(function (list) { return Rx.Observable.from(list); })
  .subscribe(print); // 1, 2, 3, 1, 9, 4

Note that this does not continue as soon as one of the original observables is completed. This is because it is zipstrictly "balanced" and waits until all sources have a suitable event. What you want is a slightly lost version of zip when working with complete sources.

+5

(, undefined), , :

var concat = Rx.Observable.concat;
var repeat = Rx.Observable.repeat;
var zipArray = Rx.Observable.zipArray;
var fromArray = Rx.Observable.fromArray;

var print = console.log.bind(console);

var s1 = fromArray([1, 1, 5]);
var s2 = fromArray([2, 9]);
var s3 = fromArray([3, 4, 6, 7, 8]);

alternate(s1, s2, s3).subscribe(print);

function alternate() {
    var sources = Array.slice(arguments).map(function(s) {
        return concat(s, repeat(undefined))
    });
    return zipArray(sources)
            .map(function(values) {
                return values.filter(function(x) {
                    return x !== undefined;
                });
            }).takeWhile(function(values) {
                return values.length > 0;
            }).concatMap(function (list) { return fromArray(list); })
}

ES6:

const {concat, repeat, zipArray, fromArray} = Rx.Observable;

var print = console.log.bind(console);

var s1 = fromArray([1, 1, 5]);
var s2 = fromArray([2, 9]);
var s3 = fromArray([3, 4, 6, 7, 8]);

alternate(s1, s2, s3).subscribe(print);

function alternate(...sources) {
    return zipArray(sources.map( (s) => concat(s, repeat(undefined)) ))
            .map((values) => values.filter( (x) => x !== undefined ))
            .takeWhile( (values) => values.length > 0)
            .concatMap( (list) => fromArray(list) )
}
+3

All Articles