How to wait for Observerable onComplete () in flatMap ()

I have an Observable that reads data from a database. If the data is null , I need to get it from the network. Thus, I execute flatMap on the first Observable , check the result of the database operation, and if it is null , I run this other Observable to retrieve the data from the network.

Note. Observable have different Subscriber , because I have different post-processing depending on where the data comes from (such logic).

  Observable.just(readDataFromDb()).flatMap(new Func1<SomeData, Observable<String>>() { @Override public Observable<SomeData> call(SomeData s) { if (s == null) { getReadFromNetworkObservable().subscribe(new AnotherSubscriber()); // this one might not complete return Observable.empty(); // I think I need to send this one only after readFromNetwork() completed } else { return Observable.just(s); } } }).subscribe(new SomeSubscirber()); 

Given that I am sending Observable.empty() to exclude data processing for SomeSubscriber , I have a hunch that my second Observable may not always be finished, because it might just be garbage collection. I guess I saw this during my tests.

At this point, I think I just need to wait until the Observable that is reading from the network is complete and then sends Observable.empty() . Can I do synchronous execution? But still I have the feeling that I'm doing it wrong.

+6
source share
1 answer

You can do any observable blocking with a .toBlocking shortcut (see full details https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators )

 Data d = getReadFromNetworkObservable() .toBlocking() .first() // or single() or singleOrDefault() // manipulate with data here 

The combination of cache with network data is described here: http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/

And here: RxJava and cached data

+7
source

All Articles