Proper Use of Retrofit + RxJava combLatest

I want to make 2 network calls asynchronously - I use Retrofit + RxJava to accomplish this. This logic consists of a simple Runner class to test the solution. NOTE. This applies mainly to server-side RxJava.

My code is as follows:

public static void main(String[] args) throws Exception { Api api = ...; Observable.combineLatest( api.getStates(), api.getCmsContent(), new Func2<List<States>, CmsContent, String>() { @Override public String call(List<State> states, CmsContent content) { ... return "PLACEHOLDER"; } }) .observeOn(Schedulers.immediate()) .subscribeOn(Schedulers.immediate()) .subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("COMPLETED"); } @Override public void onError(Throwable e) { System.out.println("ERROR: " + e.getMessage()); } @Override public void onNext(String s) { // I don't care what returned here } }); } 

Three questions:

  • Is Observable.combineLatest best operator to use when you want to make multiple REST calls asynchronously and continue when all calls are completed?
  • The implementation of My Func2 currently returns a String . After making two API calls, I will process the results in the Func2#call() method. I don’t care what comes back - there should be a better way to handle this, right, am I right?
  • API calls are correctly executed with the above code. But when the program starts, the main method does not match the correct Process finished with exit code 0 . What can lead to code freezing?

UPDATE - 2015-05-14

Based on the recommendation, I changed the logic to the following:

 public static void main(String[] args) throws Exception { Api api = ...; Observable.zip( api.getStates(), api.getCmsContent(), new Func2<List<States>, CmsContent, Boolean>() { @Override public Boolean call(List<State> states, CmsContent content) { // process data return true; } }) .subscribeOn(Schedulers.io()) .toBlocking() .first(); } 

This seems like the solution I was looking for. I am going to use it for some time to see if I encountered any problems.

+7
retrofit rx-java
source share
3 answers

1) If you know that you will have one value for both paths, it is the same as zip .

2) What would you like to do? You will get a couple of values ​​in your Func2 , and if you care about what happens with onNext , return the value of your choice.

3) Schedulers.immediate() not a real scheduler in a sense, and is very prone to deadlock scenarios with a single pool. You really don't need to use it. If you want to block the main thread until async is complete, use toBlocking().first() , for example.

+5
source share

1) It is best to use zip() . The combination of the latter is good if one of the two (or more) apis returns “slower” different results / it has a caching entity.

2) Fun2 makes it easy to combine results. It is better (by architecture) to process the result either in onNext () or onError (). You can use the simple class Pair<T,Y> to pass the results from Func2 to onNext ().

3) Nothing wrong. The result, as said, should be processed in onNext (), and not in onComplete. According to the Restore source code , the results are only transmitted (of course, in different ways) to onNext ().

I hope for help.

+1
source share

I understand that I am late for a year, but the editing published by OP in 2015-05-14 does not meet its original requirement:

I want to make 2 network calls asynchronously

  • The observed getStates and getCmsContent will NOT run simultaneously if they are not individually subscribed to separate threads. This is a key point omitted in his message, and none of the previous answers have provoked him.

     Observable.fromCallable(() -> doStuff()) .subscribeOn(Schedulers.computation()); 

As @akarnokd said, in case both streams have single values, zip and combineLatest behave similarly. The merge function will be blocked until both getStates and getCmsContent return, but, as shown above, each of them is executed simultaneously on separate threads.

  1. Another solution depends on the ability to combine List<States> and CmsContent as they become available. Given its code, it is obvious that the “data keeper” is some kind (not shown), because the returned value is Boolean , not the combined data. In the following case, forEach is executed simultaneously.

     Observable.just(api.getStates(), api.getCmsContent()) // subscribe on separate thread as shown previously .flatMap(this::buildObservable) .toBlocking() // executes concurrently .forEach(item -> { // merge into "data holder" }); 

Of course, in this code there is a problem not strict typing so that the choice is made.

+1
source share

All Articles