RxJava: find out if BehaviorSubject was a duplicate value or not

I am creating an Android interface that shows some data received from the network. I want it to show the last available data and it should never be empty (until any data has been received), so I use BehaviorSubject to provide subscribers (my UI) with the latest information available, updating it in the background to update it .

This works, but due to another requirement in my user interface, now I have to know if the published result was received from the network or not. (In other words, I need to know if the published result was a saved BehaviorSubject or not.)

How can i achieve this? If I need to split it into several Observables, it is fine if I can get the BehaviorSubject caching behavior (getting the last available result), as well as the ability to find out if the result was returned from the cache or not. The hacky way I can think of to do this would be to check if there was a response timestamp relatively soon, but it would be very messy, and I would prefer a way to do this with RxJava.

+7
java android reactive-programming rx-java
source share
5 answers

As you mentioned in the question, this can be accomplished using multiple Observables. In essence, you have two observables: “you can observe the fresh answer” and “observe the cached answer”. If something can be "observed", you can express it as an Observed. Name the first original and the second replayed .

See JSBin (JavaScript, but concepts can be directly translated into Java. There is no JavaBin for this purpose).

 var original = Rx.Observable.interval(1000) .map(function (x) { return {value: x, from: 'original'}; }) .take(4) .publish().refCount(); var replayed = original .map(function (x) { return {value: x.value, from: 'replayed'}; }) .replay(null, 1).refCount(); var merged = Rx.Observable.merge(original, replayed) .replay(null, 1).refCount() .distinctUntilChanged(function (obj) { return obj.value; }); console.log('subscribe 1st'); merged.subscribe(function (x) { console.log('subscriber1: value ' + x.value + ', from: ' + x.from); }); setTimeout(function () { console.log(' subscribe 2nd'); merged.subscribe(function (x) { console.log(' subscriber2: value ' + x.value + ', from: ' + x.from); }); }, 2500); 

The general idea is here: annotate an event with a from field indicating its origin. If it is original , this is a new answer. If it is replayed , this is a cached answer. Observable original will only emit from: 'original' , and Observable replayed will only generate from: 'replayed' . In Java, we need a little more templates because you need to create a class to represent these annotated events. Otherwise, the same operators in RxJS can be found in RxJava.

The original Observable publish().refCount() , because we want only one instance of this stream to be available to all observers. In fact, in RxJS and Rx.NET, share() is an alias for publish().refCount() .

Reproduced by Observable replay(1).refCount() because it also shares the same as the original, but replay(1) gives us caching behavior.

merged Observable contains both original and reproducible ones, and this is what you should provide to all subscribers. Since replayed will immediately stand out whenever original does, we use distinctUntilChanged for the value of the event to ignore immediate followers. The reason we replay(1).refCount() also merging is because we want the merging of the original and playback to also be one common stream instance common to all observers. We would use publish().refCount() for this purpose, but we cannot lose the replay effect that contains replayed , so it is replay(1).refCount() , not publish().refCount() .

+6
source share

Does Distinct Cover Your Case? BehaviorSubject only repeats the last item after the subscription.

+1
source share

I believe you want something like this:

 private final BehaviorSubject<T> fetched = BehaviorSubject.create(); private final Observable<FirstTime<T>> _fetched = fetched.lift(new Observable.Operator<FirstTime<T>, T>() { private AtomicReference<T> last = new AtomicReference<>(); @Override public Subscriber<? super T> call(Subscriber<? super FirstTime<T>> child) { return new Subscriber<T>(child) { @Override public void onCompleted() { child.onCompleted(); } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onNext(T t) { if (!Objects.equals(t, last.getAndSet(t))) { child.onNext(FirstTime.yes(t)); } else { child.onNext(FirstTime.no(t)); } } }; } }); public Observable<FirstTime<T>> getObservable() { return _fetched; } public static class FirstTime<T> { final boolean isItTheFirstTime; final T value; public FirstTime(boolean isItTheFirstTime, T value) { this.isItTheFirstTime = isItTheFirstTime; this.value = value; } public boolean isItTheFirstTime() { return isItTheFirstTime; } public T getValue() { return value; } public static <T> FirstTime<T> yes(T value) { return new FirstTime<>(true, value); } public static <T> FirstTime<T> no(T value) { return new FirstTime<>(false, value); } } 

The FirstTime wrapper FirstTime has a boolean value that can be used to see if any Observable subscriber has seen it before.

Hope this helps.

+1
source share

Store information about BehaviorSubjects in a data structure with a good search, such as Dictionnary. Each value will be key, and the value will be the number of iterations.

So, when you look at a special key, if your dictionnary already contains it, and its value is already in one, then you know that the value is a repeating value.

0
source share

I'm not quite sure what you want to achieve. Probably you would just like to have an intelligent source for the “latest” data and a second source that tells you when the data was updated?

  BehaviorSubject<Integer> dataSubject = BehaviorSubject.create(42); // initial value, "never empty" Observable<String> refreshedIndicator = dataSubject.map(data -> "Refreshed!"); refreshedIndicator.subscribe(System.out::println); Observable<Integer> latestActualData = dataSubject.distinctUntilChanged(); latestActualData.subscribe( data -> System.out.println( "Got new data: " + data)); // simulation of background activity: Observable.interval(1, TimeUnit.SECONDS) .limit(100) .toBlocking() .subscribe(aLong -> dataSubject.onNext(ThreadLocalRandom.current().nextInt(2))); 

Output:

 Refreshed! Got new data: 42 Refreshed! Got new data: 0 Refreshed! Refreshed! Refreshed! Got new data: 1 Refreshed! Got new data: 0 Refreshed! Got new data: 1 
0
source share

All Articles