Which stream is unsubscribeOn by default when we do not specify it but still specify the stream for subscribeOn?
By default, when none of subscribeOn / observOn / unsubscribeOn is set, then unsubscribeOn (as well as others) defaults to the current stream.
If we set the stream for subscribeOn, and for observOn / unsubscribeOn, then unsubscribeOn will use the same stream specified in subscribeOn.
If we call both subscribeOn and ObserveOn, but not unsubscribeOn, then unsubscribeOn will use the stream specified in watchOn.
If all three methods (subscribeOn, observOn and unsubscribeOn) are set, then unsubscribeOn will use the stream specified by unsubscribeOn. In fact, unsubscribeOn will happen in the stream specified in the unsubscribeOn method, regardless of the weather, the previous methods are installed or not.
Is it required to specify the stream that we want to happen even when it is the same stream as the one used in subscribeOn?
- As explained above, if unsubscribeOn is not installed, then unsubscribeOn occurs during observation, if it is installed. If not, then this happens on the thread set by subscribeOn. Now we do not need to set different threads for unsubscribeOn if you do not perform some lengthy task when canceling your subscription. In most cases, or at least from my code, this is true, and therefore we do not need to install another thread.
Here is a sample I created that can be used to test above in Android. Just comment out or modify the streams as required to check the various results.
public void testRxThreads() { createThreadObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .unsubscribeOn(Schedulers.newThread()) .subscribe(printResult()); } private Observable<String> createThreadObservable() { return Observable.create(subscriber -> { subscriber.add(new Subscription() { @Override public void unsubscribe() { System.out.println("UnSubscribe on Thread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); // perform unsubscription } @Override public boolean isUnsubscribed() { return false; } }); subscriber.setProducer(n -> { System.out.println("Producer thread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); }); subscriber.onNext("Item 1"); subscriber.onNext("Item 2"); subscriber.onCompleted(); }); } private Action1<String> printResult() { return result -> { System.out.println("Subscriber thread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); System.out.println("Result: " + result); }; }
This led to the following result:
Producer thread: 556 RxIoScheduler-2 Subscriber thread: 1 main Result: Item 1 Subscriber thread: 1 main Result: Item 2 UnSubscribe on Thread: 557 RxNewThreadScheduler-1
Commenting or deleting unsubscribeOn causes the following.
Subscriber thread: 1 main Result: Item 1 Subscriber thread: 1 main Result: Item 2 UnSubscribe on Thread: 1 main
Retrieving both observOn and unsubscribeOn labels causes the following:
Producer thread: 563 RxIoScheduler-2 Subscriber thread: 563 RxIoScheduler-2 Result: Item 1 Subscriber thread: 563 RxIoScheduler-2 Result: Item 2 UnSubscribe on Thread: 563 RxIoScheduler-2
Thanks to drhr for the initial explanation of the events. This helped me to more research and verify the results with the sample above.
achie
source share