How to use RxJava Interval Operator

I am studying the RxJava operator, and I found this code below, did not print anything:

public static void main(String[] args) { Observable .interval(1, TimeUnit.SECONDS) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError -> " + e.getMessage()); } @Override public void onNext(Long l) { System.out.println("onNext -> " + l); } }); } 

Like ReactiveX, interval

create an observable that emits a sequence of integers separated by a specific time interval

Did I make a mistake or forget about something?

+12
rx-java
source share
4 answers

You must block until the observable is consumed:

 public static void main(String[] args) throws Exception { CountDownLatch latch = new CountDownLatch(1); Observable .interval(1, TimeUnit.SECONDS) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { System.out.println("onCompleted"); // make sure to complete only when observable is done latch.countDown(); } @Override public void onError(Throwable e) { System.out.println("onError -> " + e.getMessage()); } @Override public void onNext(Long l) { System.out.println("onNext -> " + l); } }); // wait for observable to complete (never in this case...) latch.await(); } 

You can add .take(10) , for example, to see the observable completed.

+16
source share

Put Thread.sleep(1000000) after signing up and you will see that it works. Observable.interval works by default on Schedulers.computation() , so your thread runs in a thread other than the main thread.

+5
source share

As they say, the interval already works asynchronously, so you need to wait for the completion of all events.

You can get a subscription after signing up, and then use TestSubcriber, which is part of the reactiveX platform, and which will give you the opportunity to wait for all events to complete.

  @Test public void testObservableInterval() throws InterruptedException { Subscription subscription = Observable.interval(1, TimeUnit.SECONDS) .map(time-> "item emitted") .subscribe(System.out::print, item -> System.out.print("final:" + item)); new TestSubscriber((Observer) subscription) .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); } 

My github has more examples if you need https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

0
source share

Just add System.in.read() at the end of the code

0
source share

All Articles