RxJava checks only the first response element with a timeout

I see that ReactiveX (RxJava) has a timeout statement that will be applied to every element in the subscription stream. But I just want to check the very first answer with a timeout and don't care about the timeouts for the following answers. How can I implement this requirement elegantly using RxJava statements?

+8
source share
4 answers

Here is a more functional way to do this. It is in Scala, but Java must be transcribed:

 val myTimeout : Observable[Nothing] = Observable timer (10 seconds) flatMap (_ => Observable error new TimeoutException("I timed out!")) myStream amb myTimeout 

The amb statement returns the value of the observable that the first emits.

+3
source

One way to do this:

 Observable<Response> respStream = respStream(); ConnectableObservable<Response> sharedRespStream = respStream.publish(); Observable<String> first = sharedRespStream.first().timeout(2, TimeUnit.SECONDS); Observable<String> rest = sharedRespStream.skip(1); Observable<String> result = first.mergeWith(rest); sharedRespStream.connect(); result.subscribe(response -> handleResponse(response), error -> handleError(error)); 

The code itself explains: exchange answers to avoid duplicate requests, apply a timeout to the first element emitted, and combine it with the elements following the first.

+2
source

The best option is to use a timeout overload, which returns the timeout observed for each item and has it also for subscription (exactly the one that interests you).

 observable.timeout( () -> Observable.empty().delay(10, TimeUnit.SECONDS), o -> Observable.never() ) 

I will explain that the first func0 will be launched upon subscription and will produce an empty observable (which completes) the delay for the desired time. if time passes before any product arrives, there will be a timeout, as you would like. the second parameter func1 will determine the timeouts between elements that you are not using, so we just skip never (which does not complete or does nothing)

Another option is to follow Luciano's suggestion, you can do it like this:

  public static class TimeoutFirst<T> implements Transformer<T,T> { private final long timeout; private final TimeUnit unit; private TimeoutFirst(long timeout, TimeUnit unit) { this.timeout = timeout; this.unit = unit; } @Override public Observable<T> call(Observable<T> observable) { return Observable.amb(observable, Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " " + unit.name())))); } } public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) { return new TimeoutFirst<>(timeout, seconds); } 

this is a pretty neat solution using amb.

+2
source

RxJava2 version for @ndori answer

 Observable.timeout( Observable.empty().delay(10, TimeUnit.SECONDS), o -> Observable.never() ) 

Source documents

0
source

All Articles