Background
I have a series of RxJava Observables (either generated from Jersey clients, or using stubs using Observable.just(someObject) ). All of them must emit exactly one value. I have a component test that scoffs at all Jersey clients and uses Observable.just(someObject) , and I see the same behavior there as when executing production code.
I have several classes that affect these observables, perform some calculations (and some side effects - I can make them direct return values โโlater) and return empty void visible values.
At some point, in one of these classes, I try to fix several of my original observables, and then compare them - something like below:
public Observable<Void> doCalculation() { return Observable.zip( getObservable1(), getObservable2(), getObservable3(), UnifyingObject::new ).concatMap(unifyingObject -> unifyingObject.processToNewObservable()) } // in Unifying Object public Observable<Void> processToNewObservable() { // ... do some calculation ... return Observable.empty(); }
Then the computing classes are combined and wait:
// Wait for rule computations to complete List<Observable<Void>> calculations = ...; Observable.zip(calculations, results -> results) .toBlocking().lastOrDefault(null);
Problem
The problem is that processToNewObservable() never executed. As I fix it, I see that the problem is: if I replaced it with Observable.just(null) , everything will be executed as I could have expected (but with a null value, where I want the real one).
To repeat, getObservable1() returns the observable from the Jersey client to production code, but this client is a mockito mock returning Observable.just(someValue) in my test.
Study
If I convert getObservable1() to a lock, and then complete the first value in just() , again everything is executed as I expect (but I do not want to enter a lock step):
Observable.zip( Observable.just(getObservable1().toBlocking().first()), getObservable2(), getObservable3(), UnifyingObject::new ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
My first thought was that maybe something else was consuming the value coming from my observable, and the zip saw that it was already completed, thus determining that the result of their fastener should be empty observable. I tried adding .cache() to every observable source that I think is related, however, this did not change the behavior.
I also tried adding the next / error / complete / finally handlers to getObservable1 (without converting it to a lock) just before the zip, but none of them were executed:
getObservable1() .doOnNext(...) .doOnCompleted(...) .doOnError(...) .finallyDo(...); Observable.zip( getObservable1(), getObservable2(), getObservable3(), UnifyingObject::new ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
Question
I am very new to RxJava, so Iโm pretty sure that I donโt have something fundamental. The question is, what nonsense could I do? If this is not obvious from what I have said so far, what can I do to help diagnose the problem?