Why doesn't my RxJava Observable emit or fill if it doesn't block?

Background

I have a series of RxJava Observables (either generated from Jersey clients, or using stubs using Observable.just(someObject) ). All of them must emit exactly one value. I have a component test that scoffs at all Jersey clients and uses Observable.just(someObject) , and I see the same behavior there as when executing production code.

I have several classes that affect these observables, perform some calculations (and some side effects - I can make them direct return values โ€‹โ€‹later) and return empty void visible values.

At some point, in one of these classes, I try to fix several of my original observables, and then compare them - something like below:

 public Observable<Void> doCalculation() { return Observable.zip( getObservable1(), getObservable2(), getObservable3(), UnifyingObject::new ).concatMap(unifyingObject -> unifyingObject.processToNewObservable()) } // in Unifying Object public Observable<Void> processToNewObservable() { // ... do some calculation ... return Observable.empty(); } 

Then the computing classes are combined and wait:

 // Wait for rule computations to complete List<Observable<Void>> calculations = ...; Observable.zip(calculations, results -> results) .toBlocking().lastOrDefault(null); 

Problem

The problem is that processToNewObservable() never executed. As I fix it, I see that the problem is: if I replaced it with Observable.just(null) , everything will be executed as I could have expected (but with a null value, where I want the real one).

To repeat, getObservable1() returns the observable from the Jersey client to production code, but this client is a mockito mock returning Observable.just(someValue) in my test.

Study

If I convert getObservable1() to a lock, and then complete the first value in just() , again everything is executed as I expect (but I do not want to enter a lock step):

 Observable.zip( Observable.just(getObservable1().toBlocking().first()), getObservable2(), getObservable3(), UnifyingObject::new ).concatMap(unifyingObject -> unifyingObject.processToNewObservable()) 

My first thought was that maybe something else was consuming the value coming from my observable, and the zip saw that it was already completed, thus determining that the result of their fastener should be empty observable. I tried adding .cache() to every observable source that I think is related, however, this did not change the behavior.

I also tried adding the next / error / complete / finally handlers to getObservable1 (without converting it to a lock) just before the zip, but none of them were executed:

 getObservable1() .doOnNext(...) .doOnCompleted(...) .doOnError(...) .finallyDo(...); Observable.zip( getObservable1(), getObservable2(), getObservable3(), UnifyingObject::new ).concatMap(unifyingObject -> unifyingObject.processToNewObservable()) 

Question

I am very new to RxJava, so Iโ€™m pretty sure that I donโ€™t have something fundamental. The question is, what nonsense could I do? If this is not obvious from what I have said so far, what can I do to help diagnose the problem?

+8
java rx-java reactivex
source share
3 answers

Note I did not find my answer very satisfactory here, so I dug a little and found a much smaller case of playback, so I asked a new question here: Why is my RxJava Observable emitting only the first user?


I found out at least part of my problems (and, apologizing for everyone who tried to answer, I do not think that you gave me most of the chances, given my explanations).

The various classes that perform these calculations all returned Observable.empty() (according to processToNewObservable() in my original example). As far as I can tell, Observable.zip() does not subscribe to the Nth observed attenuation until the Nth 1st observable emits a value.

My initial example claimed that it was getObservable1() , which was a bad mistake - in fact, it was slightly inaccurate, it was later visible in the parameter list. As far as I understand, the reason for blocking and then turning this value into observable worked again, because blocking and calling first forced it, and I got the side effects that I wanted.

If I change all my calculations to return Observable.just(null) instead, everything works: the final zip() all observable classes of observables works through all of them, so that all expected side effects occur.

Returning a zero Void seems like I'm definitely doing something wrong in terms of design, but at least that answers this particular question.

0
source share

The observed must emit in order to begin the chain. You should think of your pipeline as a statement of what will happen when the Observable comes out.

You did not share what was actually observed, but Observable.just () causes Observable to instantly send a wrapped object.

+1
source share

Based on the response in a comment, either one of getObservable does not return any value, but simply quits, or mocking Mockito does something wrong. The following standalone example works for me. Could you check this out and start mutating it slowly to see where things break?

 Observable.zip( Observable.just(1), Observable.just(2), Observable.just(3), (a, b, c) -> new Integer[] { a, b, c }) .concatMap(a -> Observable.from(a)) .subscribe(System.out::println) ; 
0
source share

All Articles