Retrofit Chain Services with RxJava Support

I'm having problems binding observables using modified RxJava support. Perhaps I misunderstand how to use it, otherwise it may be a modification error. Hope someone here can help me understand what is happening. Edit: I am using the MockRestAdapter for these answers - this can make a difference, as I can see that the implementations of RxSupport are slightly different.

This is a fake banking application. He tries to complete the transfer, and after the transfer is completed, he must make an account request to update the account values. This is essentially just an excuse to try out flatMap. The following code, unfortunately, does not work, none of the subscribers receive notifications:

Case 1: Combining Two Modified Observed Data

Transfer service (note: returns the observed modification):

@FormUrlEncoded @POST("/user/transactions/") public Observable<TransferResponse> transfer(@Field("session_id") String sessionId, @Field("from_account_number") String fromAccountNumber, @Field("to_account_number") String toAccountNumber, @Field("amount") String amount); 

Account Service (note: returns the observed update):

 @FormUrlEncoded @POST("/user/accounts") public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId); 

Chains combine two modified observables together:

 transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount) .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() { @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) { return accountsService.getAccounts(session.getSessionId()); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); 

Case 2: creating my own observable and modified version chain

If I ignore the built-in Rx support in Retrofit to call flat matched, it works great! All subscribers receive a notification. See below:

New account (note: does not create visible):

 @FormUrlEncoded @POST("/user/accounts") public List<Account> getAccountsBlocking(@Field("session_id") String sessionId); 

Create my own observables and emit elements yourself:

 transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount) .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() { @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) { return Observable.create(new Observable.OnSubscribe<List<Account>>() { @Override public void call(Subscriber<? super List<Account>> subscriber) { List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId()); subscriber.onNext(accounts); subscriber.onCompleted(); } }); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); 

Any help would be greatly appreciated!

+7
android retrofit rx-java square netflix
source share
1 answer

Answer: yes, you should be able to bind observable elements from "Retrofit". It seems that in MockRestAdapter $ MockRxSupport: createMockObservable is a private class. The way the planning is done with respect to the subscription of the subscriber to the observed seems wrong. Signing to the observable comes after the HttpExecutor is launched in the thread itself. I believe that the original thread that comes from the Schedulers.io () thread has completed and will unsubscribe before returning mockHandler.invokeSync. You can subscribe to Observable. Hopefully this explanation will make some sense if you look at the code in the retrofit module.

As a workaround for the current code using a modified layout, you can replace the default internal executor with your own ImmediateExecutor implementation. This would allow, at least when testing mocks, to have a stream stream that will be provided by your plugin.

 // ImmediateExecutor.java public class ImmediateExecutor implements Executor { @Override public void execute(Runnable command) { command.run(); } } // Create your RestAdapter with your ImmdiateExecutor RestAdapter adapter = new RestAdapter.Builder() .setEndpoint(endpoint) .setExecutors(new ImmediateExecutor(), null) .build(); 

As for fixing the problem in the source code, you can also include the retrofit layout project as a source in your project and change the MockRestAdapter $ MockRxSupport: createMockObservable method using the code below. I checked your use case and it fixes the problem.

--- MockRestAdapter.java $ MockRxSupport ----

 Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo, final RequestInterceptor interceptor, final Object[] args) { return Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(final Subscriber<? super Object> subscriber) { try { if (subscriber.isUnsubscribed()) return; Observable observable = (Observable) mockHandler.invokeSync(methodInfo, interceptor, args); observable.subscribeOn(Schedulers.from(httpExecutor)); //noinspection unchecked observable.subscribe(subscriber); } catch (RetrofitError e) { subscriber.onError(errorHandler.handleError(e)); } catch (Throwable e) { subscriber.onError(e); } } }); } 

Has created a problem with the Retrofit project here , we will see if they agree.

+5
source share

All Articles