How to handle RxAndroid errors in the main thread

I am new to rxJava / Android and am surprised that my lambda onError sometimes raised in the main thread and sometimes not, although I use .observeOn(AndroidSchedulers.mainThread())

Example 1: onError in the main topic
this works as expected: onError is called on the main thread

 Observable.error(new RuntimeException("RTE")) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(s -> { Log.e(TAG, "onNext("+s+")-thread: " + Thread.currentThread().getName()); }, throwable -> { Log.e(TAG, "onError()-thread: " + Thread.currentThread().getName()); }); 

log output:

 onError()-thread: main 

Example 2: onError NOT in the main topic

 Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("one and only"); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .timeout(1, TimeUnit.SECONDS) .subscribe(s -> { Log.e(TAG, "onNext("+s+")-thread: " + Thread.currentThread().getName()); }, throwable -> { Log.e(TAG, "onError()-thread: " + Thread.currentThread().getName()); }); 

the output looks something like this:

 onNext(one and only)-thread: main onError()-thread: RxComputationScheduler-4 

I thought that after calling observOn (AndroidSchedulers.mainThread ()), ALL emissions should be done in the main thread.

so i have the following questions:

  • I could not find documentation that indicates under what circumstances onError is called in which thread. Does anyone know the link?
  • Of course, I want to display some indication of errors in the GUI: so how can I make onError ALWAYS call in the main thread?
+6
source share
2 answers

I just found out that I only need to change the order of the calls. When I call observeOn after timeout , it works as expected:

 .timeout(1, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) 

log output

 onNext(one and only)-thread: main onError()-thread: main 

The reason is that observeOn will only affect everything below the call, and only until some other statement changes the flow again. In the above example, timeout() will change to a computation flow.

Please note that subscribeOn works differently. It doesn't matter in which chain you call it. You should call it only once (when you call it several times, the first win wins: see "Multiple subscribeOn" in this Blog )

Here is a good blog post with more details: RxJava- Understanding observOn () and subscribeOn ()

+6
source

Because .timeout(1, TimeUnit.SECONDS) works by default on Schedulers.computation() .

+2
source

All Articles