RxJava shares observed outliers between multiple subscribers

I have the following problem:

I have an observable that does a certain job, but other observables need a conclusion on what can be observed for the job. I tried several times to subscribe to the same observable, but inside the log I see that the original observable is triggered several times.

thats my visibleable thats creates an object:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> { if (mApi == null) { //do some work } subscriber.onNext(mApi); subscriber.unsubscribe(); }) 

thats my visibleable that needs an object

 loadApi().flatMap(api -> api....())); 

I use

 .subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread()) .unsubscribeOn(Schedulers.io() 

for all observables.

+7
java android rx-java
source share
2 answers

I'm not sure I understood your question correctly, but I believe that you are looking for a way to share the outliers observed between several subscribers. There are several ways to do this. For example, you can use Connectable Observable as follows:

 ConnectableObservable<Integer> obs = Observable.range(1,3).publish(); obs.subscribe(item -> System.out.println("Sub A: " + item)); obs.subscribe(item -> System.out.println("Sub B: " + item)); obs.connect(); //Now the source observable starts emitting items 

Output:

 Sub A: 1 Sub B: 1 Sub A: 2 Sub B: 2 Sub A: 3 Sub B: 3 

Alternatively, you can use PublishSubject :

 PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject subject.subscribe(item -> System.out.println("Sub B: " + item)); Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable 

Output:

 Sub A: 1 Sub B: 1 Sub A: 2 Sub B: 2 Sub A: 3 Sub B: 3 

Both of these examples are single-threaded, but you can easily add observOn or subscirbeOn calls to make them asynchronous.

+12
source share

First of all, using Observable.create is hard and easy to make a mistake. You need something like

 Observable.create(subscriber -> { if (mApi == null) { //do some work } if (!subscriber.isUnsubscribed()) { subscriber.onNext(mApi); subscriber.onCompleted(); // Not subscriber.unsubscribe(); } }) 

you can use

 ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect(); 

All subsequent subscribers should receive a single emitted item.

 obs.subscribe(item -> System.out.println("Sub 1 " + item)); obs.subscribe(item -> System.out.println("Sub 2 " + item)); obs.subscribe(item -> System.out.println("Sub 3 " + item)); obs.subscribe(item -> System.out.println("Sub 4 " + item)); 
+1
source share

All Articles