As you mentioned in the question, this can be accomplished using multiple Observables. In essence, you have two observables: “you can observe the fresh answer” and “observe the cached answer”. If something can be "observed", you can express it as an Observed. Name the first original and the second replayed .
See JSBin (JavaScript, but concepts can be directly translated into Java. There is no JavaBin for this purpose).
var original = Rx.Observable.interval(1000) .map(function (x) { return {value: x, from: 'original'}; }) .take(4) .publish().refCount(); var replayed = original .map(function (x) { return {value: x.value, from: 'replayed'}; }) .replay(null, 1).refCount(); var merged = Rx.Observable.merge(original, replayed) .replay(null, 1).refCount() .distinctUntilChanged(function (obj) { return obj.value; }); console.log('subscribe 1st'); merged.subscribe(function (x) { console.log('subscriber1: value ' + x.value + ', from: ' + x.from); }); setTimeout(function () { console.log(' subscribe 2nd'); merged.subscribe(function (x) { console.log(' subscriber2: value ' + x.value + ', from: ' + x.from); }); }, 2500);
The general idea is here: annotate an event with a from field indicating its origin. If it is original , this is a new answer. If it is replayed , this is a cached answer. Observable original will only emit from: 'original' , and Observable replayed will only generate from: 'replayed' . In Java, we need a little more templates because you need to create a class to represent these annotated events. Otherwise, the same operators in RxJS can be found in RxJava.
The original Observable publish().refCount() , because we want only one instance of this stream to be available to all observers. In fact, in RxJS and Rx.NET, share() is an alias for publish().refCount() .
Reproduced by Observable replay(1).refCount() because it also shares the same as the original, but replay(1) gives us caching behavior.
merged Observable contains both original and reproducible ones, and this is what you should provide to all subscribers. Since replayed will immediately stand out whenever original does, we use distinctUntilChanged for the value of the event to ignore immediate followers. The reason we replay(1).refCount() also merging is because we want the merging of the original and playback to also be one common stream instance common to all observers. We would use publish().refCount() for this purpose, but we cannot lose the replay effect that contains replayed , so it is replay(1).refCount() , not publish().refCount() .
André staltz
source share