RxJava: retryWhen with retry restriction

I am new to ReactiveX and reactive programming in general. I need to implement a retry mechanism for Couchbase CAS operations, but the example on the Couchbase website shows retryWhen, which seems to repeat endlessly. I need to have a repeat limit and repeat the score somewhere there.

A simple retry () function will work as it accepts retryLimit, but I don't want it to be repeated on every exception, only on a CASMismatchException.

Any ideas? I am using the RxJava library.

+7
rx-java couchbase
source share
3 answers

In addition to what Simon Bazle said, here is a short version with a linear digression:

.retryWhen(notification -> notification .zipWith(Observable.range(1, 5), Tuple::create) .flatMap(att -> att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(att.value2(), TimeUnit.SECONDS) ) ) 

Note that the “att” here is a tuple that consists of both throwable and the number of repetitions, so you can very accurately implement the return logic based on these two parameters.

If you want to know even more, you can look into the stable document that I am writing now: https://gist.github.com/daschl/db9fcc9d2b932115b679#retry-with-delay

+8
source share

retryWhen is clearly a bit more complicated than simple repetition, but here its essence:

  • you pass the notificationHandler function to retryWhen, which takes an Observable<Throwable> and outputs an Observable<?>
  • emission of the returned Observable determines when a repeat or stop occurs
  • therefore, for each exception that occurs in the original thread, if the handler emits 1 element, there will be 1 repetition. If it emits 2 elements, there will be 2 ...
  • as soon as the handler thread throws an error, the retry is aborted.

Using this, you can both:

  • only works on CasMismatchExceptions : just return the Observable.error(t) function in other cases
  • repeat only a certain number of times: for each flatMap exception from Observable.range representing the maximum number of attempts, it should return Observable.timer with a # replay if you need to increase the delays.

Your usage example is pretty close to what is in the RxJava document here

+2
source share

revitalize this thread, as there is a new, easier way to do this in the Couchbase Java SDK 2.1.2: use RetryBuilder :

 Observable<Something> retryingObservable = sourceObservable.retryWhen( RetryBuilder //will limit to the relevant exception .anyOf(CASMismatchException.class) //will retry only 5 times .max(5) //delay doubling each time, from 100ms to 2s .delay(Delay.linear(TimeUnit.MILLISECONDS, 2000, 100, 2.0)) .build() ); 
+2
source share

All Articles