Combining an unknown number of observables using RxJS

I'm currently trying to create a small project to demonstrate reactive programming using RxJS. The goal is to show my colleagues that this thing is, and it's worth exploring. I do not have a framework, so this complicates the situation.

I am going to deploy another demo of mine to use RxJS. This is not a very difficult demonstration, basically I can add any number of small forms that lead to a number calculated by a small formula, and there is a button that sums up the values ​​of all forms.

Getting a formula that calculates inside forms is easy, but I decided that I could go further.

I want the summation operation to be performed automatically through the combined observable. The only solution I understood was something like that:

//dummy observables var s1 = Rx.Observable.Interval(100); var s2 = Rx.Observable.Interval(200); //Combine the observables var m = s1.combineLatest(s2, function(x,y){return x+y}); //Subscribe to the combined observable var sub = m.subscribe(function(x){console.log(x)}); //A new observable is created var s3 = Rx.Observable.Interval(300); //Now I need to update all my subscriptions, wich is a pain. m = m.combine(s3, function(x,y){return x+y}); sub.dispose(); sub=m.subscribe(function(x){console.log(x)}); 

I thought I could get another observable to notify my subscriptions, to renew myself - since knowing how all my subscribers work, the whole architecture will be useless, but it sounds like overkill for such a task, and I don't just mean a demo, I I can’t imagine that in the real world there would be “every day”, where such an architecture would do things cleaner than just observe any changes and get the calculated values ​​“actively” from my forms. I would probably do the active retrieval and summation of values ​​inside the module processing the forms, and the "m" observed for the outside world to subscribe to it by clicking my values ​​into it from inside the module.

Will this be the right approach? I think yes, because they belong to my module, I should have full control over what happens to them, but I really wonder what more experienced people think about it.

+4
source share
4 answers

I do not think that you will find an operator who will do what you need.

There is nothing wrong with creating your own operator:

 var source = //An observable of observables of form data Observable.prototype.combineLatestObservable = function(resultSelector) { var source = this; return Rx.Observable.create(function(obs) { var disposable = new Rx.SerialDisposable(); var sources= []; return source.subscribe( function(x) { //Update the set of observables sources.push(x); //This will dispose of the previous subscription first //then subscribe to the new set. disposable.seDisposable(Rx.Observable.combineLatest(sources, resultSelector) .subscribe(obs)); }, function(e) { obs.onError(e); }, function() { obs.onCompleted(); }); }).share(); } 

Or if you want to do this with operators:

 //Have to use arguments since we don't know how many values we will have function sums() { var sum = 0; for (var i = 0, len = arguments.length; i < len; ++i) { sum += arguments[i]; } return sum; } source //Capture the latest set of Observables .scan([], function(acc, x) { acc.push(x); return acc; }) //Dispose of the previous set and subscribe to the new set .flatMapLatest(function(arr) { return Observable.combineLatest(arr, sums); }) //Don't know how many subscribers you have but probably want to keep from //recreating this stream for each .share(); 
+1
source

I use this fragment to merge an unknown number of observables -

  someArrayOfIdsToPerformRequest: []; this.isLoading = true; const mergedRequests: Array<Promise<JSON>> = []; this.someArrayOfIdsToPerformRequest.forEach((item) => { mergedRequests.push(this.postsService.remove({id: item.id}) //returns observable .toPromise()); }); Promise.all(mergedRequests) .then(() => { this.isLoading = false; this.loadPosts(); }); 

Hope this helps someone!

+1
source

If you look at the source of combineLatest , you will see that it expects a variable number of Observable s, not just one. With that in mind, you can do something like this:

 // Whatever Observables you want... const a = Rx.Observable.of(1,2,3) const b = Rx.Observable.interval(100).take(5) const c = Rx.Observable.of(2) Rx.Observable.combineLatest(a, b, c) // notice, I didn't define a resultSelector .do(numbers => console.log(numbers)) // you get 3 element arrays .map(numbers => numbers.reduce((sum, n) => sum + n, 0)) .subscribe(sum => console.log(sum)) 

The result of this with my example would be:

 [3, 0, 2] // array printed in the do block 5 // the sum of the array printed from the subscription [3, 1, 2] 6 [3, 2, 2] 7 [3, 3, 2] 8 [3, 4, 2] 9 

If you want to call this using the Observable array, you can use Function#apply for this:

 Rx.Observable.combineLatest.apply(Rx.Observable, [a, b, c]) 

I hope I do not understand what you are asking

0
source

Here is one way to do this, written in TypeScript:

 let newObservable = arrayOfObservables.reduce((previousValue, currentValue)=> { if (previousValue) { return previousValue.merge(currentValue); } return currentValue; }, null); 
-one
source

All Articles