For convenience, I created these extension functions for Flowable and Observable .
Note that with doOnFirst() action will be called before the first element is thrown, while doAfterFirst() will first emit the first element and then execute the action.
fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> = take(1) .doOnNext { onFirstAction.invoke(it) } .concatWith(skip(1)) fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> = take(1) .doOnNext { onFirstAction.invoke(it) } .concatWith(skip(1)) fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> = take(1) .doAfterNext { afterFirstAction.invoke(it) } .concatWith(skip(1)) fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> = take(1) .doAfterNext { afterFirstAction.invoke(it) } .concatWith(skip(1))
Usage is so simple:
Flowable.fromArray(1, 2, 3) .doOnFirst { System.err.println("First $it") } .subscribe { println(it) }
Output:
// First 1 // 1 // 2 // 3
A:
Flowable.fromArray(1, 2, 3) .doAfterFirst { System.err.println("First $it") } .subscribe { println(it) }
Output:
// 1 // First 1 // 2 // 3
source share