RxJava Observable receives notification of first issue

I have three Observables that I combine with combLastest:

Observable<String> o1 = Observable.just("1"); Observable<String> o2 = Observable.just("2"); Observable<String> o3 = Observable.just("3"); Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() { @Override public Object call(String s, String s2, String s3) { return null; } }); 

I want to receive notifications of the first issue of one of the observed ones, without ignoring later emissions, which, I think, the first operator would do. Is there a convenient operator for this type (example):

  o1.doOnFirst(new Func1<String, Void>() { @Override public Void call(String s) { return null; } }) 
+10
source share
4 answers

I think you can have a practical doOnFirst with a simple take if you are handling a stream:

 public static <T> Observable<T> withDoOnFirst(Observable<T> source, Action1<T> action) { return source.take(1).doOnNext(action).concatWith(source); } 

Thus, the action is attached only to the first element.

This can be changed to handle observed elements not supported by threads, adding skip to skip elements already accepted:

 public static <T> Observable<T> withDoOnFirstNonStream(Observable<T> source, Action1<T> action) { return source.take(1).doOnNext(action).concatWith(source.skip(1)); } 
+12
source

There are several solutions that I can think of. The first is the ugly but simple hack of doOnNext. Just add a boolean field to Action1 , indicating whether the first element was received. Having received, do whatever you want and flip the boolean. For instance:

 Observable.just("1").doOnNext(new Action1<String>() { boolean first = true; @Override public void call(String t) { if (first) { // Do soemthing first = false; } } }); 

The second is to subscribe twice to the observable that you want to track using publish or share() , and one of these publications goes through the first (depending on whether you want to manually connect to the published observable), As a result, you get two separate observables data that emit the same objects, only the first will be stopped after the first emission:

 ConnectableObservable<String> o1 = Observable.just("1").publish(); o1.first().subscribe(System.out::println); //Subscirbed only to the first item o1.subscribe(System.out::println); //Subscirbed to all items o1.connect(); //Connect both subscribers 
+5
source

Using rxjava-extras :

 observable .compose(Transformers.doOnFirst(System.out::println)) 

The module was tested and under the cover just uses the counter to subscribe to the operator. Please note that it is important for each subscription, since there are many use cases where the observed instance is used more than once, and we want the doOnFirst operator to doOnFirst applied every time.

The source code is here .

+4
source

For convenience, I created these extension functions for Flowable and Observable .
Note that with doOnFirst() action will be called before the first element is thrown, while doAfterFirst() will first emit the first element and then execute the action.

 fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> = take(1) .doOnNext { onFirstAction.invoke(it) } .concatWith(skip(1)) fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> = take(1) .doOnNext { onFirstAction.invoke(it) } .concatWith(skip(1)) fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> = take(1) .doAfterNext { afterFirstAction.invoke(it) } .concatWith(skip(1)) fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> = take(1) .doAfterNext { afterFirstAction.invoke(it) } .concatWith(skip(1)) 

Usage is so simple:

 Flowable.fromArray(1, 2, 3) .doOnFirst { System.err.println("First $it") } .subscribe { println(it) } 

Output:

 // First 1 // 1 // 2 // 3 

A:

 Flowable.fromArray(1, 2, 3) .doAfterFirst { System.err.println("First $it") } .subscribe { println(it) } 

Output:

 // 1 // First 1 // 2 // 3 
+1
source

All Articles