Rx: How to get the last item, even if onError was called?

I am using RxJava and I need to do 2 things:

  • Get the last item emitted from Observable
  • Determine if it was called onError, vs.onCompleted

I looked at using lastand lastOrDefault(actually this is the behavior I need), but I was not able to get around onErrorby hiding the last element. I would be happy if the Observable will be used twice, once to get the value lastand once to get the completion status, but so far I have managed to do this by creating my own Observer:

public class CacheLastObserver<T> implements Observer<T> {

    private final AtomicReference<T> lastMessageReceived = new AtomicReference<>();
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    @Override
    public void onCompleted() {
        // Do nothing
    }

    @Override
    public void onError(Throwable e) {
        error.set(e);
    }

    @Override
    public void onNext(T message) {
        lastMessageReceived.set(message);
    }

    public Optional<T> getLastMessageReceived() {
        return Optional.ofNullable(lastMessageReceived.get());
    }

    public Optional<Throwable> getError() {
        return Optional.ofNullable(error.get());
    }
}

Observer, , Rx " , ". , ?

+4
4

:

source.materialize().buffer(2).last()

, , Notification . .

, , , .

+3

I used this approach to solve your problem.

public class ExampleUnitTest {
    @Test
    public void testSample() throws Exception {
        Observable.just(1, 2, 3, 4, 5)
                .map(number -> {
                    if (number == 4)
                        throw new NullPointerException();
                    else
                        return number;
                })
                .onErrorResumeNext(t -> Observable.empty())
                .lastOrDefault(15)
                .subscribe(lastEmittedNumber -> System.out.println("onNext: " + lastEmittedNumber));
    }
}

He will emit onNext: 3

Hope this helps.

0
source

I decided with:

source.materialize().withPrevious().last()

where withPreviousthere is (Kotlin):

fun <T> Observable<T>.withPrevious(): Observable<Pair<T?, T>> =
    this.scan(Pair<T?, T?>(null, null)) { previous, current -> Pair(previous.second, current) }
        .skip(1)
        .map { it as Pair<T?, T> }
0
source

All Articles