Run Order for RxJS Observed from Chained Promises

In AnuglarJS 2 application, I want to make Observable from chained promises. Since promises provides a one-time result, the first step was to use Observable.fromPromise() , and then mapAll() for the meta observable to work with complete fromPromise complete . I have found this question useful here SO RxJS: How to have one process of observing several observables?

Since the accepted answer from the above question concerns simple events, I easily prepared my own solution using Observable.fromPromise(someComposedPromise) instead of Observable.fromEvent(someEvent) . Unfortunately, even though everything works fine for a simple single promise, a problem arises when a promise consists of two promises due to the order in which promises are resolved.

For simplicity and isolation of the case, suppose we have some existing external DumbCache (and what I would like to use is Ionic 2 LocalStorage , where the simplest option is similar to this):

 class DumbCache { cache = {}; get(key) { return new Promise((resolve, reject) => { var value = this.cache[key]; resolve(value); }); } set(key, value) { return new Promise((resolve, reject) => { this.cache[key] = value; resolve(); }); } } 

then the approach described above:

 class CacheValueObservable { private cache: DumbCache; constructor(private key: string) { this.cache = new DumbCache(); } /* * meta observer to handle promises from observables with all results and errors * thanks to ReplaySubject(1) current value is available immediately after subscribing */ private _valueSource$$ = new Rx.ReplaySubject(1); private _value$ = this._valueSource$$.mergeAll(); public value$() { return this._value$; } public updateValue(value) { this._valueSource$$.next( Rx.Observable.fromPromise( this.cache.set(this.key, value) .then(() => this.cache.get(this.key)) ) ); } } 

now for the following code:

 let cacheValueObservable = new CacheValueObservable("TEST_KEY"); cacheValueObservable.updateValue('VALUE 0'); cacheValueObservable.value$().subscribe( val => { console.log('val:' + val); }, val => console.log('err', val.stack), () => (console.log('complete')) ); cacheValueObservable.updateValue('VALUE 1'); cacheValueObservable.updateValue('VALUE 2'); console.log('end'); 

result:

 starting... end val:VALUE 2 val:VALUE 2 val:VALUE 2 

although I would like to achieve

 starting... end val:VALUE 0 val:VALUE 1 val:VALUE 2 

full example here: http://jsbin.com/wiheki/edit?js,console

+2
source share
1 answer

In trying to express this question, which will be well described, I still studied and understood the problem better and better. The main thing is that the first promise this.cache.set(this.key, value) can be actually immediately resolved, and Observable.fromPromise does not guarantee in which order all the following chain promises are resolved.

The problem was caused by the fact that the last promise from each chain was made only after the state was changed with the first promise of the last chain (thus, VALUE 2 ).

In the end, the solution looks pretty simple from a code point of view, but it is not as obvious as consisting of two key changes:

  • defer the initial promise until the mergeAll phase using Observable.defer instead of Observable.fromPromise
  • limit (or actually disable) concurrency merge promises using mergeAll(1)

thus, the working solution is as follows:

 class CacheValueObservable { private cache: DumbCache; constructor(private key: string) { this.cache = new DumbCache(); } /* * meta observer to handle promises from observables with all results and errors * thanks to ReplaySubject(1) current value is available immediately after subscribing */ private _valueSource$$ = new Rx.ReplaySubject(1); // disable merge concurrency private _value$ = this._valueSource$$.mergeAll(1); public value$() { return this._value$; } public updateValue(value) { this._valueSource$$.next( // defer promise resolution to ensure they will be fully resolved // one by one thanks to no concurrency in mergeAll Rx.Observable.defer(() => this.cache.set(this.key, value) .then(() => this.cache.get(this.key)) ) ); } } 

and here is a live example: http://jsbin.com/bikawo/edit?html,js,console

+1
source

All Articles