How to wait for asynchronous monitoring

I am trying to create a sample using rxjava. The sample must organize ReactiveWareService and ReactiveReviewService to retrieve the WareAndReview composite.

ReactiveWareService public Observable<Ware> findWares() { return Observable.from(wareService.findWares()); } ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency! public Observable<Review> findReviewsByItem(final String item) { return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> { try { List<Review> reviews = reviewService.findReviewsByItem(item); reviews.forEach(observer::onNext); observer.onCompleted(); } catch (Exception e) { observer.onError(e); } })); } public List<WareAndReview> findWaresWithReviews() throws RuntimeException { final List<WareAndReview> wareAndReviews = new ArrayList<>(); wareService.findWares() .map(WareAndReview::new) .subscribe(wr -> { wareAndReviews.add(wr); //Async!!!! reviewService.findReviewsByItem(wr.getWare().getItem()) .subscribe(wr::addReview, throwable -> System.out.println("Error while trying to find reviews for " + wr) ); } ); //TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion! try { Thread.sleep(3000); } catch (InterruptedException e) {} return wareAndReviews; } 

Given the fact that I don't want to return an Observable, how can I wait for the async Observable (findReviewsByItem) to complete?

+8
java rx-java
source share
3 answers

Most of your example can be rewritten with standard RxJava statements that work well together:

 public class Example { Scheduler scheduler = Schedulers.from(executor); public Observable<Review> findReviewsByItem(final String item) { return Observable.just(item) .subscribeOn(scheduler) .flatMapIterable(reviewService::findReviewsByItem); } public List<WareAndReview> findWaresWithReviews() { return wareService .findWares() .map(WareAndReview::new) .flatMap(wr -> { return reviewService .findReviewsByItem(wr.getWare().getItem()) .doOnNext(wr::addReview) .lastOrDefault(null) .map(v -> wr); }) .toList() .toBlocking() .first(); } } 

Whenever you want to compose such services, think first of flatMap . You do not need to block for each sub-observable, but only at the very end with toBlocking() , if it is really necessary.

+13
source share

You can use BlockingObservable methods see https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators . eg,

 BlockingObservable.from(reviewService.findReviewsByItem(..)).toIterable() 
+10
source share

Another way would be to declare CountdownLatch before starting. Then call the countDown () function on that latch in onCompleted (). You can then replace your Thread.sleep () with await () on that latch.

-4
source share

All Articles