The base object in Rx is Observable . This object typically wraps an OnSubscribe object, which is simply an Action1 extension that takes a Subscriber parameter as a parameter.
What all this means is that you just need to define a class that wraps your call and passes the result to Subscriber :
public class RxPaperGet implements Observable.OnSubscribe<String> { @Override public void call(Subscriber<? super String> t1) { try { t1.onNext(Paper.get("city")); } catch (Throwable t) { t1.onError(t); return; } t1.onCompleted(); } }
This is a basic example. Now you want to wrap this so that you can call any function, not just Paper.get("city") . Something like https://github.com/ReactiveX/RxJavaAsyncUtil/blob/0.x/src/main/java/rx/util/async/operators/OperatorFromFunctionals.java#L44 does this, allowing you to pass an arbitrary Callable .
What in your case will be implemented as:
Observable<String> res = OperatorFromFunctionals.fromCallable(() -> Paper.get("city"));
(In case you are interested, this is java8 lambdas, which was added to android from retrolambda. It's very nice to remove the Rx verbosity)
Once you have the observable, you can subscribe to it and get the results. To execute in the background and get results in the ui thread, follow these steps:
res.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers provided by rx-android .
Then you can simply call the callback with the result:
.subscribe(city -> Log.d(TAG, city));
This returns a subscription, which is useful if you need to cancel it.
Generally:
OperatorFromFunctionals.fromCallable(() -> Paper.get("city")) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(city -> Log.d(TAG, city));