RxJS takeWhile but include last value

I have an RxJS5 pipeline that looks like

Rx.Observable.from([2, 3, 4, 5, 6]) .takeWhile((v) => { v !== 4 }) 

I want to keep the subscription until I see 4, but I want the last 4 element to be included in the result too. So the above example should be

 2, 3, 4 

However, according to the takeWhile paper , the takeWhile statement takeWhile not included. This means that when it encounters an element that does not match the predicate we gave, it immediately terminates the stream without the last element. As a result, the above code actually outputs

 2, 3 

So my question is: in what simple way can I achieve takeWhile but also emit the last element with RxJS?

+14
reactive-programming rxjs rxjs5
source share
5 answers

A PR is already open there, which adds an optional inclusive parameter to takeWhile : https://github.com/ReactiveX/rxjs/pull/4115

There are at least two possible workarounds:

  1. using concatMap() :

     of('red', 'blue', 'green', 'orange').pipe( concatMap(color => { if (color === 'green') { return of(color, null); } return of(color); }), takeWhile(color => color), ) 
  2. Using multicast() :

     of('red', 'blue', 'green', 'orange').pipe( multicast( () => new ReplaySubject(1), subject => subject.pipe( takeWhile((c) => c !== 'green'), concat(subject.take(1), ) ), ) 

I also used this operator, so I added it to my own set of additional RxJS 5 operators: https://github.com/martinsik/rxjs-extra#takewhileinclusive

This statement was also discussed in this RxJS 5 issue: https://github.com/ReactiveX/rxjs/issues/2420

January 2019: updated for RxJS 6

+16
source share

If your comparison is such that you know exactly what the last element is (for example, for !== ), you can add it again:

 Rx.Observable.from([2, 3, 4, 5, 6]) .takeWhile((v) => v !== 4) .concat(Rx.Observable.of(4)) .subscribe(console.log) 
+3
source share

I ran into the same problem, I need the last element to enable, so I decided to keep the subscription link and unsubscribe in the onNext when this condition was met. Using your sample code, this will be:

 const subscription = Observable.of('red', 'blue', 'green', 'orange') .subscribe(color => { // Do something with the value here if (color === 'green') { subscription.unsubscribe() } }) 

This worked for me because it also caused the observed source to stop emitting what I need in my scenario. I understand that I am not using the takeWhile operator, but the main goal has been achieved without any workarounds or additional code. I am not a fan of making things work in such a way that they are not designed. The disadvantages of this are:

  • If there are any other observers signed, the source will continue to emit.
  • onCompleted not called for any reason if the last observer has canceled the subscription, but I have verified that the source actually stops issuing.
+2
source share

You can use endWith(value) which (unlike a lot of RxJS code) is very well self-documenting.

 const example = source.pipe( takeWhile(val => val != 4), endWith(4)); 

PS. Also note that takeUntil does not accept a predicate, so if you tried to use this operator to solve this problem, you cannot. This is a completely different signature method.

White papers: https://rxjs-dev.firebaseapp.com/api/operators/endWith

https://stackblitz.com/edit/typescript-pvuawt

+1
source share

In my case, I could not predict what the final value would be. I also just wanted to find a solution that included simple, simple operators, and I wanted something that I could reuse, so I could not rely on the validity of the values. The only thing I could think of was to define my own operator as follows:

 import { pipe, from } from 'rxjs'; import { switchMap, takeWhile, filter, map } from 'rxjs/operators'; export function doWhile<T>(shouldContinue: (a: T) => boolean) { return pipe( switchMap((data: T) => from([ { data, continue: true }, { data, continue: shouldContinue(data), exclude: true } ])), takeWhile(message => message.continue), filter(message => !message.exclude), map(message => message.data) ); } 

This is a little strange, but it works for me, and I can import and use it.

0
source share

All Articles