Angular2 Merge Observed

I am having problems with some observables. It seems I can’t get two observables in order to combine each other well. They work great on their own, but I need both values.

db.glass.subscribe( ( glass: GlassData[] ): void => { console.log( glass ); // This prints }); db.cassette_designs.subscribe( ( cassettes: CassetteData[] ): void => { console.log( cassettes ): // This prints }); 

Not everyone is familiar with the observables, the first thing I tried was nested into each other, but the inner one seems to do nothing.

 db.glass.subscribe( ( glass: GlassData[] ): void => { console.log( glass ); // This prints db.cassette_designs.subscribe( ( cassettes: CassetteData[] ): void => { console.log( cassettes ): // This doesn't }); }); 

It seemed a little silly, so I searched on Google to find out if there is a better way to combine observables, and it turns out that there are several. I tried zip and forkJoin as they looked the most similar to what I wanted to do, but nothing forkJoin with them.

 Observable.zip( db.cassette_designs, db.glass, ( cassettes: CassetteData[], glass: GlassData[] ): void => { console.log( cassettes ); // doesn't print console.log( glass ); // doesn't print }); Observable.forkJoin( [ db.cassette_designs, db.glass ] ).subscribe( ( data: any ): void => { console.log( data ); // doesn't print }); 

It may be something simple, as I am calling functions incorrectly, but I would have thought that at some point I would get some warning or error. tsc does not have any code problems, and I do not receive messages in the console developer on Chrome or Firefox.

Update

I tried combineLatest , but it still doesn't display anything in the console. I have to miss something, but I'm not sure what. They work individually.

 Observable.combineLatest( db.cassette_designs, db.glass, ( cassettes: CassetteData[], glass: GlassData[] ): void => { console.log( cassettes ); // doesn't print console.log( glass ); // deson't print }); 

Observables are created as follows:

 ... public Listen( event: string ): Observable<Response> { return new Observable<Response>( ( subscriber: Subscriber<Response> ): Subscription => { const listen_func = ( res: Response ): void => subscriber.next( res ); this._socket.on( event, listen_func ); return new Subscription( (): void => this._socket.removeListener( event, listen_func ) ); }); } ... 

Then, in order to actually get the observables, I send a response listen to the corresponding event. eg.

 ... public cassette_designs: Observable<CassetteData[]>; ... this.cassette_designs = _socket.Listen( "get_cassette_designs" ) .map( ( res: Response ) => res.data.data ); 
+7
angular typescript rxjs
source share
6 answers

I managed to get combineLatest to work, actually subscribing to observables.

I originally did this:

 Observable.combineLatest( db.cassette_designs, db.glass, ( cassettes: CassetteData[], glass: GlassData[] ): void => { console.log( cassettes ); console.log( glass ); }); 

Now I do this:

 Observable.combineLatest( db.cassette_designs, db.glass ).subscribe( ( data: any[] ): void => { console.log( data ); // cassettes - data[0] // glass - data[1] }); 
+9
source share

To keep track of your results:

1) Observable is a lazy type of runtime data, which means that it will not execute anything in the pipeline until it is signed. This also applies to combinatorial operators. zip , forkJoin , combineLatest and withLatestFrom will all recursively subscribe to Observables , so you will only pass them after they themselves sign.

Consequently:

 var output = Observable.combinelatest(stream1, stream2, (x, y) => ({x, y})); 

Actually you will not name anything until you name output.subscribe() , at which point subscribe also called on stream1 and stream2 , and you will get all the magic value of Rx.

2) A more minor point, but whenever you start making your own creation methods, first look at the documentation to see if it already exists. There are static creation methods for Arrays , Promises , Node-like callbacks, and even standard event templates .

So your Listen method could become:

 public Listen<R>(event: string): Observable<R> { return Observable.fromEvent(this._socket, event); } 
+6
source share

Hey, if you want to have a single stream (observable) that emits combination data from two sources, check out combineLatest .

+1
source share

Both of them: Observable.zip and Observable.forkJoin should work. (Personally, I prefer "forkJoin" - it returns an array with the same order that you click on observables, and you don't need many arguments)

Perhaps if you create observables manually (Observable.create ...), you just forgot to call "completed" for both provided observers from the "create" method.

+1
source share

I used combLatest . I need three API calls, but I only need one object that combines the three responses.

I followed the formula mentioned in the same post:

 var output = Observable.combinelatest(stream1, stream2, (x, y) => ({x, y})); 

Being the final code:

 getGobalStats(): Observable<any> { let obs1 = this._http.get(this._config.getGlobalStatsUrl(), this._config.getOptions()) .map((res: Response) => { return res.json().content; }) .catch((error: any) => { console.error(error); return error; }); let obs2 = this._http.get(this._config.getGlobalStatsUrl() + '?type=1', this._config.getOptions()) .map((res: Response) => { return res.json().content; }) .catch((error: any) => { console.error(error); return error; }); let obs3 = this._http.get(this._config.getGlobalStatsUrl() + '?type=3', this._config.getOptions()) .map((res: Response) => { return res.json().content; }) .catch((error: any) => { console.error(error); return error; }); return Observable.combineLatest(obs1,obs2,obs3,(res1,res2,res3) => { return {all:res1,running: res2, cycling: res3}}); } 
0
source share
  Observable.merge( Observable.fromEvent(canvasEl, 'mousedown'), Observable.fromEvent(canvasEl, 'touchstart')) .switchMap((e) => { return Observable.merge( Observable.fromEvent(canvasEl, 'mousemove').takeUntil(Observable.fromEvent(canvasEl, 'mouseup')), Observable.fromEvent(canvasEl, 'touchmove').takeUntil(Observable.fromEvent(canvasEl, 'touchend')), ) .pairwise() }) .subscribe(... 
0
source share

All Articles