Rxjava: Is it possible to use the retry () function, but with a delay?

I use rxjava in my Android application to asynchronously process network requests. Now I would like to retry the failed network request only after a while.

Is it possible to use the retry () function in Observable, but only repeat after some delay?

Is there a way to tell Observable that it is currently retrying (as opposed to trying the first time)?

I looked at debounce () / throttleWithTimeout (), but they seem to be doing something else.

Edit:

I think I found one way to do this, but I would be interested to confirm that this is the right way to do this, or for other, better ways.

What I am doing is: In the call () method of my Observable.OnSubscribe, before I call the Subscribers onError () method, I just allow Thread sleep for the required amount of time. So, to repeat every 1000 milliseconds, I am doing something like this:

@Override public void call(Subscriber<? super List<ProductNode>> subscriber) { try { Log.d(TAG, "trying to load all products with pid: " + pid); subscriber.onNext(productClient.getProductNodesForParentId(pid)); subscriber.onCompleted(); } catch (Exception e) { try { Thread.sleep(1000); } catch (InterruptedException e1) { e.printStackTrace(); } subscriber.onError(e); } } 

Since this method works on an I / O stream, it does not block the user interface. The only problem I see is that even the first error is reported with a delay, so the delay exists even if there is no retry (). I would like it to be better if the delay was not applied after the error, but instead before retrying (but not before the first attempt, obviously).

+85
rx-java
Feb 27 '14 at 11:00
source share
14 answers

You can use the retryWhen() operator to add repeat logic to any observable.

The following class contains retry logic:

RxJava 2.x

 public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable<?> apply(final Observable<? extends Throwable> attempts) { return attempts .flatMap(new Function<Throwable, Observable<?>>() { @Override public Observable<?> apply(final Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

RxJava 1.x

 public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable<?> call(Observable<? extends Throwable> attempts) { return attempts .flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

Using:

 // Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds. observable .retryWhen(new RetryWithDelay(3, 2000)); 
+162
Aug 13 '14 at
source share

Inspired by Paul 's answer , and if you don't care about retryWhen problems identified by Abhijit Sarkar , the easiest way to postpone re-subscription with rxJava2 is unconditionally:

 source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS)) 

You might want to see more examples and explanations when retrying and retrying when .

+15
Mar. 14 '17 at 12:46 on
source share

This solution is based on the Ben Christensen fragments I saw, RetryWhen Example and RetryWhenTestsConditional (I had to change n.getThrowable() to n to make it work). I used evant / gradle-retrolambda to do lambda notation on Android, but you don't need to use lambda (although this is highly recommended). For the delay, I implemented exponential rollback, but you can connect whatever return logic you want there. For completeness, I added the subscribeOn and observeOn . I am using ReactiveX / RxAndroid for AndroidSchedulers.mainThread() .

 int ATTEMPT_COUNT = 10; public class Tuple<X, Y> { public final X x; public final Y y; public Tuple(X x, Y y) { this.x = x; this.y = y; } } observable .subscribeOn(Schedulers.io()) .retryWhen( attempts -> { return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i)) .flatMap( ni -> { if (ni.y > ATTEMPT_COUNT) return Observable.error(ni.x); return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS); }); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 
+9
Nov 23 '14 at 22:09
source share

instead of using MyRequestObservable.retry I use the wrapper function retryObservable (MyRequestObservable, retrycount, seconds) that return a new Observable that handle indirect communication for delay, so I can do

 retryObservable(restApi.getObservableStuff(), 3, 30) .subscribe(new Action1<BonusIndividualList>(){ @Override public void call(BonusIndividualList arg0) { //success! } }, new Action1<Throwable>(){ @Override public void call(Throwable arg0) { // failed after the 3 retries ! }}); // wrapper code private static <T> Observable<T> retryObservable( final Observable<T> requestObservable, final int nbRetry, final long seconds) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(final Subscriber<? super T> subscriber) { requestObservable.subscribe(new Action1<T>() { @Override public void call(T arg0) { subscriber.onNext(arg0); subscriber.onCompleted(); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { if (nbRetry > 0) { Observable.just(requestObservable) .delay(seconds, TimeUnit.SECONDS) .observeOn(mainThread()) .subscribe(new Action1<Observable<T>>(){ @Override public void call(Observable<T> observable){ retryObservable(observable, nbRetry - 1, seconds) .subscribe(subscriber); } }); } else { // still fail after retries subscriber.onError(error); } } }); } }); } 
+8
Apr 18 '14 at 16:06
source share

This example works with jxjava 2.2.2:

Try again without delay:

 Single.just(somePaylodData) .map(data -> someConnection.send(data)) .retry(5) .doOnSuccess(status -> log.info("Yay! {}", status); 

Try again with a delay:

 Single.just(somePaylodData) .map(data -> someConnection.send(data)) .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS)) .doOnSuccess(status -> log.info("Yay! {}", status) .doOnError((Throwable error) -> log.error("I tried five times with a 300ms break" + " delay in between. But it was in vain.")); 

Our original single fails if someConnection.send () does not work. When this happens, observed failures inside retryWhen throw an error. We delay this emission for 300 ms and send it back to report a retry. take (5) ensures that our observed alarms will stop after we get five errors. retryWhen sees completion and does not retry after the fifth failure.

+8
09 Oct '18 at 14:04
source share

retryWhen is a complex, perhaps even buggy operator. In the white paper and at least one answer, the range statement is used here, which will not be executed if there are no retries. See my discussion with ReactiveX member David Karnock.

I improved the response on kjones by changing flatMap to concatMap and adding the RetryDelayStrategy class. flatMap does not preserve the order of radiation, but concatMap , which is important for delays with deviation. RetryDelayStrategy , as the name indicates, allows the user to choose from various retry delay creation modes, including rollback. The code is available on my GitHub complete with the following test cases:

  • Exceeds 1st attempt (no attempts)
  • Failure after 1 retry attempt
  • An attempt to retry 3 times, but with success on the 2nd, therefore, does not repeat the third time
  • Succeeds in 3rd attempt

See the setRandomJokes method.

+4
Jul 18 '16 at 5:23
source share

Now, using RxJava version 1.0+, you can use zipWith to achieve a retry with a delay.

Adding changes to kjones .

Modified

 public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int MAX_RETRIES; private final int DELAY_DURATION; private final int START_RETRY; /** * Provide number of retries and seconds to be delayed between retry. * * @param maxRetries Number of retries. * @param delayDurationInSeconds Seconds to be delays in each retry. */ public RetryWithDelay(int maxRetries, int delayDurationInSeconds) { MAX_RETRIES = maxRetries; DELAY_DURATION = delayDurationInSeconds; START_RETRY = 1; } @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable .delay(DELAY_DURATION, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer attempt) { return attempt; } }); } } 
+3
Sep 13 '16 at 17:57
source share

Same answer as kjones, but updated to latest version. For RxJava 2.x version: ('io.reactivex.rxjava2: rxjava: 2.1.3')

 public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> { private final int maxRetries; private final long retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Flowable.error(throwable); } }); } } 

Using:

// Add repeat logic to an existing observable. // Repeat a maximum of 3 times with a delay of 2 seconds.

 observable .retryWhen(new RetryWithDelay(3, 2000)); 
+3
Sep 27 '17 at 12:55 on
source share

You can add the delay to the Observable returned by retryWhen Operator

  /** * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated */ @Test public void observableOnErrorResumeNext() { Subscription subscription = Observable.just(null) .map(Object::toString) .doOnError(failure -> System.out.println("Error:" + failure.getCause())) .retryWhen(errors -> errors.doOnNext(o -> count++) .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), Schedulers.newThread()) .onErrorResumeNext(t -> { System.out.println("Error after all retries:" + t.getCause()); return Observable.just("I save the world for extinction!"); }) .subscribe(s -> System.out.println(s)); new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); } 

You can see more examples here. https://github.com/politrons/reactive

+1
Jul 18 '16 at 8:08
source share

For version Kotlin & RxJava1

 class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long) : Function1<Observable<out Throwable>, Observable<*>> { private val START_RETRY: Int = 1 override fun invoke(observable: Observable<out Throwable>): Observable<*> { return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), object : Function2<Throwable, Int, Int> { override fun invoke(throwable: Throwable, attempt: Int): Int { return attempt } }) } } 
0
Apr 17 '18 at 8:01
source share

(Kotlin) I slightly improved the code with exponential rollback and applied the protective radiation Observable.range ():

  fun testOnRetryWithDelayExponentialBackoff() { val interval = 1 val maxCount = 3 val ai = AtomicInteger(1); val source = Observable.create<Unit> { emitter -> val attempt = ai.getAndIncrement() println("Subscribe ${attempt}") if (attempt >= maxCount) { emitter.onNext(Unit) emitter.onComplete() } emitter.onError(RuntimeException("Test $attempt")) } // Below implementation of "retryWhen" function, remove all "println()" for real code. val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx -> throwableRx.doOnNext({ println("Error: $it") }) .zipWith(Observable.range(1, maxCount) .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) }, BiFunction { t1: Throwable, t2: Int -> t1 to t2 } ) .flatMap { pair -> if (pair.second >= maxCount) { Observable.error(pair.first) } else { val delay = interval * 2F.pow(pair.second) println("retry delay: $delay") Observable.timer(delay.toLong(), TimeUnit.SECONDS) } } } //Code to print the result in terminal. sourceWithRetry .doOnComplete { println("Complete") } .doOnError({ println("Final Error: $it") }) .blockingForEach { println("$it") } } 
0
May 7 '18 at 19:01
source share

in case you need to print the repetition count, you can use the example provided on the Rxjava wiki page https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

 observable.retryWhen(errors -> // Count and increment the number of errors. errors.map(error -> 1).scan((i, j) -> i + j) .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount)) // Limit the maximum number of retries. .takeWhile(errorCount -> errorCount < retryCounts) // Signal resubscribe event after some delay. .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS)); 
0
Apr 27 '19 at 14:33
source share

Based on kjones answer, here is a version of RxJava 2.x Kotlin with delayed replay as an extension. Replace Observable to create the same extension for Flowable .

 fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> { var retryCount = 0 return retryWhen { thObservable -> thObservable.flatMap { throwable -> if (++retryCount < maxRetries) { Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS) } else { Observable.error(throwable) } } } } 

Then just use it on the observable observable.retryWithDelay(3, 1000)

0
Aug 30 '19 at 16:14
source share

Just do the following:

  Observable.just("") .delay(2, TimeUnit.SECONDS) //delay .flatMap(new Func1<String, Observable<File>>() { @Override public Observable<File> call(String s) { L.from(TAG).d("postAvatar="); File file = PhotoPickUtil.getTempFile(); if (file.length() <= 0) { throw new NullPointerException(); } return Observable.just(file); } }) .retry(6) .subscribe(new Action1<File>() { @Override public void call(File file) { postAvatar(file); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } }); 
-one
Jul 04 '16 at 13:05
source share



All Articles