The best option is to use a timeout overload, which returns the timeout observed for each item and has it also for subscription (exactly the one that interests you).
observable.timeout( () -> Observable.empty().delay(10, TimeUnit.SECONDS), o -> Observable.never() )
I will explain that the first func0 will be launched upon subscription and will produce an empty observable (which completes) the delay for the desired time. if time passes before any product arrives, there will be a timeout, as you would like. the second parameter func1 will determine the timeouts between elements that you are not using, so we just skip never (which does not complete or does nothing)
Another option is to follow Luciano's suggestion, you can do it like this:
public static class TimeoutFirst<T> implements Transformer<T,T> { private final long timeout; private final TimeUnit unit; private TimeoutFirst(long timeout, TimeUnit unit) { this.timeout = timeout; this.unit = unit; } @Override public Observable<T> call(Observable<T> observable) { return Observable.amb(observable, Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " " + unit.name())))); } } public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) { return new TimeoutFirst<>(timeout, seconds); }
this is a pretty neat solution using amb.
ndori source share