What stream is called unSubscribeOn? Should we call it?

Which stream is unsubscribeOn by default when we do not specify it, but still specify the stream for subscribeOn? Should we indicate the stream that we want to not subscribe, even if it is the same stream as the one used in subscribeOn?

Or do the bottom two passages do the same for un-subscription?

Option 1:

mSubscription.add(myAppProvider .getSomeData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .unsubscribeOn(Schedulers.io()) .subscribe(data -> handleData(data), throwable -> handleError(throwable) )); 

Option 2:

 mSubscription.add(myAppProvider .getSomeData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(data -> handleData(data), throwable -> handleError(throwable) )); 

I looked at the Rx-Java docs , but they only explain subscribeOn, but nothing about unSubscribeOn

+7
java android rx-java rx-android
source share
3 answers

Without subscribeOn (and without observeOn ) your actions to ban subscriptions will be performed in any thread that was launched by the subscription.

With subscribeOn your unsubscribe actions will be performed in the Scheduler specified in subscribeOn .

With observeOn your observeOn actions will be performed in the Scheduler specified in observeOn (overriding the Scheduler specified by subscribeOn ).

Here is an example. As suggested there, this is useful when the subscription itself includes lengthy operations that you want to run on another thread.

If you run your test code:

 Observable<Object> source = Observable.using( () -> { System.out.println("Subscribed on " + Thread.currentThread().getId()); return Arrays.asList(1,2); }, (ints) -> { System.out.println("Producing on " + Thread.currentThread().getId()); return Observable.from(ints); }, (ints) -> { System.out.println("Unubscribed on " + Thread.currentThread().getId()); } ); source .unsubscribeOn(Schedulers.newThread()) .subscribe(System.out::println); 

You should see the expected result:

  Subscribed on 1 Producing on 1 1 2 Unubscribed on 11 

If you delete this unsubscribeOn line, you will see:

  Unsubscribed on 1 
+3
source share

These fragments have different types of behavior.

In the general case, unsubscription moves along a sequence of statements and can be initiated by any thread (just call subscriber.unsubscribe() from any thread). Without the presence of the unsubscribeOn operator, the unsubscribeOn action is likely to terminate the action in the stream from which it was called. unsubscribeOn provides more precise control over the stream used to unsubscribe before it.

+3
source share

Which stream is unsubscribeOn by default when we do not specify it but still specify the stream for subscribeOn?

  • By default, when none of subscribeOn / observOn / unsubscribeOn is set, then unsubscribeOn (as well as others) defaults to the current stream.

  • If we set the stream for subscribeOn, and for observOn / unsubscribeOn, then unsubscribeOn will use the same stream specified in subscribeOn.

  • If we call both subscribeOn and ObserveOn, but not unsubscribeOn, then unsubscribeOn will use the stream specified in watchOn.

  • If all three methods (subscribeOn, observOn and unsubscribeOn) are set, then unsubscribeOn will use the stream specified by unsubscribeOn. In fact, unsubscribeOn will happen in the stream specified in the unsubscribeOn method, regardless of the weather, the previous methods are installed or not.

Is it required to specify the stream that we want to happen even when it is the same stream as the one used in subscribeOn?

  • As explained above, if unsubscribeOn is not installed, then unsubscribeOn occurs during observation, if it is installed. If not, then this happens on the thread set by subscribeOn. Now we do not need to set different threads for unsubscribeOn if you do not perform some lengthy task when canceling your subscription. In most cases, or at least from my code, this is true, and therefore we do not need to install another thread.

Here is a sample I created that can be used to test above in Android. Just comment out or modify the streams as required to check the various results.

  public void testRxThreads() { createThreadObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .unsubscribeOn(Schedulers.newThread()) .subscribe(printResult()); } private Observable<String> createThreadObservable() { return Observable.create(subscriber -> { subscriber.add(new Subscription() { @Override public void unsubscribe() { System.out.println("UnSubscribe on Thread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); // perform unsubscription } @Override public boolean isUnsubscribed() { return false; } }); subscriber.setProducer(n -> { System.out.println("Producer thread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); }); subscriber.onNext("Item 1"); subscriber.onNext("Item 2"); subscriber.onCompleted(); }); } private Action1<String> printResult() { return result -> { System.out.println("Subscriber thread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); System.out.println("Result: " + result); }; } 

This led to the following result:

 Producer thread: 556 RxIoScheduler-2 Subscriber thread: 1 main Result: Item 1 Subscriber thread: 1 main Result: Item 2 UnSubscribe on Thread: 557 RxNewThreadScheduler-1 

Commenting or deleting unsubscribeOn causes the following.

 Subscriber thread: 1 main Result: Item 1 Subscriber thread: 1 main Result: Item 2 UnSubscribe on Thread: 1 main 

Retrieving both observOn and unsubscribeOn labels causes the following:

 Producer thread: 563 RxIoScheduler-2 Subscriber thread: 563 RxIoScheduler-2 Result: Item 1 Subscriber thread: 563 RxIoScheduler-2 Result: Item 2 UnSubscribe on Thread: 563 RxIoScheduler-2 

Thanks to drhr for the initial explanation of the events. This helped me to more research and verify the results with the sample above.

+1
source share

All Articles