Observable, retry on error and cache only if completed

we can use the cache () operator to avoid executing a long task (http request) several times and reuse its result:

Observable apiCall = createApiCallObservable().cache(); // notice the .cache() --------------------------------------------- // the first time we need it apiCall.andSomeOtherStuff() .subscribe(subscriberA); --------------------------------------------- //in the future when we need it again apiCall.andSomeDifferentStuff() .subscribe(subscriberB); 

The first time the HTTP request is executed, but the second time, since we used the cache () operator, the request will not be executed, but we can reuse the first result.

This works great when the first request completes successfully. But if onError is called on the first try, then the next time a new subscriber subscribes to the same observable, onError will be called again without repeating the http request.

What we are trying to do is that if onError is called for the first time, then the next time someone joins the same observable, an http request will be made from scratch. those. observable will cache only successful api calls, i.e. those for which onCompleted has been called.

Any ideas on how to proceed? We tried using the retry () and cache () statements without much success.

+15
rx-java rx-android
source share
5 answers

This is the solution we got after expanding the akarnokd solution:

 public class OnErrorRetryCache<T> { public static <T> Observable<T> from(Observable<T> source) { return new OnErrorRetryCache<>(source).deferred; } private final Observable<T> deferred; private final Semaphore singlePermit = new Semaphore(1); private Observable<T> cache = null; private Observable<T> inProgress = null; private OnErrorRetryCache(Observable<T> source) { deferred = Observable.defer(() -> createWhenObserverSubscribes(source)); } private Observable<T> createWhenObserverSubscribes(Observable<T> source) { singlePermit.acquireUninterruptibly(); Observable<T> cached = cache; if (cached != null) { singlePermit.release(); return cached; } inProgress = source .doOnCompleted(this::onSuccess) .doOnTerminate(this::onTermination) .replay() .autoConnect(); return inProgress; } private void onSuccess() { cache = inProgress; } private void onTermination() { inProgress = null; singlePermit.release(); } } 

We needed to cache the result of the http request from Retrofit. Thus, it was created, with an observable, which emits a single element in memory.

If the observer signed during the execution of the http request, we wanted him to wait and not execute the request twice, unless the current request was executed. To do this, the semaphore allows single access to a block that creates or returns cached observable information, and if a new observable is created, we wait until it completes. Tests for the above can be found here.

+7
source share

Well, for everyone who is still interested, I think I have a better way to achieve this with rx.

The main note is to use onErrorResumeNext, which will allow you to replace Observable in case of an error. so it should look something like this:

 Observable<Object> apiCall = createApiCallObservable().cache(1); //future call apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { public Observable<? extends Object> call(Throwable throwable) { return createApiCallObservable(); } }); 

Thus, if the first call failed, the future call will simply call it (only once).

but every other caller who tries to use the first observable will fail and make another request.

You have made a reference to the original observable, just update it.

so lazy getter:

 Observable<Object> apiCall; private Observable<Object> getCachedApiCall() { if ( apiCall == null){ apiCall = createApiCallObservable().cache(1); } return apiCall; } 

now, the recipient that will retry if the previous one failed:

 private Observable<Object> getRetryableCachedApiCall() { return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { public Observable<? extends Object> call(Throwable throwable) { apiCall = null; return getCachedApiCall(); } }); } 

Please note that it will only repeat once for each call.

So, now your code will look something like this:

 --------------------------------------------- // the first time we need it - this will be without a retry if you want.. getCachedApiCall().andSomeOtherStuff() .subscribe(subscriberA); --------------------------------------------- //in the future when we need it again - for any other call so we will have a retry getRetryableCachedApiCall().andSomeDifferentStuff() .subscribe(subscriberB); 
+8
source share

You need to do some state processing. Here is how I would do it:

 public class CachedRetry { public static final class OnErrorRetryCache<T> { final AtomicReference<Observable<T>> cached = new AtomicReference<>(); final Observable<T> result; public OnErrorRetryCache(Observable<T> source) { result = Observable.defer(() -> { for (;;) { Observable<T> conn = cached.get(); if (conn != null) { return conn; } Observable<T> next = source .doOnError(e -> cached.set(null)) .replay() .autoConnect(); if (cached.compareAndSet(null, next)) { return next; } } }); } public Observable<T> get() { return result; } } public static void main(String[] args) { AtomicInteger calls = new AtomicInteger(); Observable<Integer> source = Observable .just(1) .doOnSubscribe(() -> System.out.println("Subscriptions: " + (1 + calls.get()))) .flatMap(v -> { if (calls.getAndIncrement() == 0) { return Observable.error(new RuntimeException()); } return Observable.just(42); }); Observable<Integer> o = new OnErrorRetryCache<>(source).get(); o.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done")); o.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done")); o.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done")); } } 

It works by caching a fully successful source and returns it to everyone. Otherwise, the (partially) failed source will create a cache, and the next call observer will re-subscribe.

+4
source share

Plato's solution is on top! In case someone needs a version of Kotlin with an extension function and parameterized cache size, here it is.

 class OnErrorRetryCache<T> constructor(source: Flowable<T>, private val retries: Int? = null) { val deferred: Flowable<T> private val singlePermit = Semaphore(1) private var cache: Flowable<T>? = null private var inProgress: Flowable<T>? = null init { deferred = Flowable.defer { createWhenObserverSubscribes(source) } } private fun createWhenObserverSubscribes(source: Flowable<T>): Flowable<T> { singlePermit.acquireUninterruptibly() val cached = cache if (cached != null) { singlePermit.release() return cached } inProgress = source .doOnComplete(::onSuccess) .doOnTerminate(::onTermination) .let { when (retries) { null -> it.replay() else -> it.replay(retries) } } .autoConnect() return inProgress!! } private fun onSuccess() { cache = inProgress } private fun onTermination() { inProgress = null singlePermit.release() } } fun <T> Flowable<T>.onErrorRetryCache(retries: Int? = null) = OnErrorRetryCache(this, retries).deferred 

And a quick test to prove how this works:

 @Test fun 'when source fails for the first time, new observables just resubscribe'() { val cacheSize = 2 val error = Exception() var shouldFail = true //only fail on the first subscription val observable = Flowable.defer { when (shouldFail) { true -> Flowable.just(1, 2, 3, 4) .doOnNext { shouldFail = false } .concatWith(Flowable.error(error)) false -> Flowable.just(5, 6, 7, 8) } }.onErrorRetryCache(cacheSize) val test1 = observable.test() val test2 = observable.test() val test3 = observable.test() test1.assertValues(1, 2, 3, 4).assertError(error) //fails the first time test2.assertValues(5, 6, 7, 8).assertNoErrors() //then resubscribes and gets whole stream from source test3.assertValues(7, 8).assertNoErrors() //another subscriber joins in and gets the 2 last cached values } 
+1
source share

Have you considered using AsyncSubject to implement a cache for a network request? I made an example RxApp application to test how it can work. I am using a singleton model to get a response from the network. This allows you to cache responses, access data from several fragments, subscribe to a pending request, and also provide mock data for automatic user interface tests.

0
source share

All Articles