Is it necessary to unsubscribe?

I have a hard time understanding RX. Should you unsubscribe in the following case? Is there a way to automatically unsubscribe after the call function?

Observable.create(new Observable.OnSubscribe<NumberInfo>() { @Override public void call(Subscriber<? super NumberInfo> subscriber) { try { // Store data to db } catch (Exception e) { Log.e(TAG, "Downloaded numberInfo was not added to cache.", e); } } }).subscribeOn(Schedulers.newThread()) .subscribe(); 

I don't want to watch any result why I omitted the classic .observeOn(AndroidSchedulers.mainThread())

thanks for the explanation.

+5
source share
2 answers

Under the Rx contract, when an Observable fires onCompleted , the Observer does not sign. In your case, the contract is not respected, because in your code there is no subscriber.onCompleted() .

If you just need something like β€œFire and Forget,” you can try simply:

 Schedulers.io().createWorker().schedule(new Action0() { @Override public void call() { try { // Store data to db } catch (Exception e) { Log.e(TAG, "Downloaded numberInfo was not added to cache.", e); } } }); 

It will run in the I / O Scheduler, and your UI thread is safe.

IMO, you should always have a return value. Your routing Store data to db certainly has some return value, such as long indicating the line number or boolean , which indicates success. With this approach, you can create the correct method:

 public Observable<Long> storeToDb(final SomethingToStore storeMe) { return Observable .create(new Observable.OnSubscribe<Long>() { @Override public void call(Subscriber<? super Long> subscriber) { long row = syncStore(storeMe); if (row == -1) { subscriber.onError(new Throwable("Cannot store " + storeMe.toString + " to DB.")); } subscriber.onNext(row); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); } 

And you can use it as follows:

 storeToDb(storeThis) .subscribe(new Observer<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.e("STORING", "Something went south: " + e.getMessage()); } @Override public void onNext(Long row) { Log.d("STORING", "Everything has been stored as record number: " + row); } }); 
+5
source

When the Observable is complete, RxJava will automatically unsubscribe. To perform automatic unsubscription, you must call subscriber.onComplete() .

+4
source

All Articles