This is the decision we have come across.
public class OnErrorRetryCache<T> { public static <T> Observable<T> from(Observable<T> source) { return new OnErrorRetryCache<>(source).deferred; } private final Observable<T> deferred; private final Semaphore singlePermit = new Semaphore(1); private Observable<T> cache = null; private Observable<T> inProgress = null; private OnErrorRetryCache(Observable<T> source) { deferred = Observable.defer(() -> createWhenObserverSubscribes(source)); } private Observable<T> createWhenObserverSubscribes(Observable<T> source) { singlePermit.acquireUninterruptibly(); Observable<T> cached = cache; if (cached != null) { singlePermit.release(); return cached; } inProgress = source .doOnCompleted(this::onSuccess) .doOnTerminate(this::onTermination) .replay() .autoConnect(); return inProgress; } private void onSuccess() { cache = inProgress; } private void onTermination() { inProgress = null; singlePermit.release(); } }
We needed to cache the result of the HTTP request from Retrofit. Thus, it was created, with an observable, which emits one element in mind.
If the observer signed during the execution of the HTTP request, we wanted him to wait and not execute the request twice, unless he passed. To do this, the semaphore allows single access to a block that creates or returns a cached observable, and if a new observable is created, we wait until it completes.
source share