RxJava - opposite switchMap () statement?

I am wondering if there is a way to create existing statements to execute the opposite of switchMap() .

switchMap() will chase after receiving the last emission and discard any Observable that it previously ran. Say I flipped it over and I want to ignore all outliers coming to the xxxMap() operator while it is busy with the first radiation received. He will continue to ignore emissions until he has finished emitting the current Observable inside him. Then he will process the next emission received.

 Observable.interval(1, TimeUnit.SECONDS) .doOnNext(i -> System.out.println("Source Emitted Value: " + i)) .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) .subscribe(i -> System.out.println("Subscriber received Value: " + i)); 

Is there any way to do this? In the above example, if intensiveProcess() lasted three seconds, ignoreWhileBusyMap() process 0 , but would probably ignore outliers 1 and 2 coming from interval() . Then it processed 3 , but probably ignored 4 and 5 , and so on ...

+6
source share
2 answers

Of course, start processing the value with the boolean set after the processing is completed:

 AtomicBoolean gate = new AtomicBoolean(true); Observable.interval(200, TimeUnit.MILLISECONDS) .flatMap(v -> { if (gate.get()) { gate.set(false); return Observable.just(v).delay(500, TimeUnit.MILLISECONDS) .doAfterTerminate(() -> gate.set(true)); } else { return Observable.empty(); } }) .take(10) .toBlocking() .subscribe(System.out::println, Throwable::printStackTrace); 

Edit

Alternative:

 Observable.interval(200, TimeUnit.MILLISECONDS) .onBackpressureDrop() .flatMap(v -> { return Observable.just(v).delay(500, TimeUnit.MILLISECONDS); }, 1) .take(10) .toBlocking() .subscribe(System.out::println, Throwable::printStackTrace); 

You can change onBackpressureDrop to onBackpressureLatest to continue with the most recent value.

+5
source

To answer Jeopardy's style: what is concatMap ?

concatMap will subscribe to the first Observable and will not subscribe to the next Observable until the previous Observable calls onComplete() .

In this regard, this is the "opposite" of switchMap , which is eagerly unsubscribing from the previous Observable when a new one arrives.

concatMap wants to hear everything that each Observer has to say, whereas switchMap is a social butterfly and moves as soon as another switchMap is available.

0
source

All Articles