GroupBy operator, elements from different groups alternate

The following code:

Observable .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) .doOnNext(item -> System.out.println("source emitting " + item)) .groupBy(item -> { System.out.println("groupBy called for " + item); return item % 3; }) .subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.subscribe(item -> { System.out.println("key " + observable.getKey() + ", item " + item); }); }); 

leaves me puzzled. The output I get is:

  source emitting 0 groupBy called for 0 got observable rx.observables.GroupedObservable@42110406 for key 0 key 0, item 0 source emitting 1 groupBy called for 1 got observable rx.observables.GroupedObserva ble@1698c449 for key 1 key 1, item 1 source emitting 2 groupBy called for 2 got observable rx.observables.GroupedObservable@5ef04b5 for key 2 key 2, item 2 source emitting 3 groupBy called for 3 key 0, item 3 source emitting 4 groupBy called for 4 key 1, item 4 source emitting 5 groupBy called for 5 key 2, item 5 source emitting 6 groupBy called for 6 key 0, item 6 source emitting 7 groupBy called for 7 key 1, item 7 source emitting 8 groupBy called for 8 key 2, item 8 source emitting 9 groupBy called for 9 key 0, item 9 

So, in the top-level subscription method, I get 3 observables from GroupedObservable, as expected. Then, one by one, I subscribe to grouped observables - and here's what I don't understand:

Why are the original elements still emitted in the original sequence (i.e. 0, 1, 2, 3, ...), and not 0, 3, 6, 9 ... for the key 0, and then 1, 4, 7 for the key 1, then 2, 5, 8 for key 2?

I think I understand how groups are created:

 1. 0 is emitted, the key function is called and it gets 0 2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0 3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly 4. source item 3 is emitted, the key function is called and it gets 0 5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable 6. etc. until the source sequence is drained 

It seems that although I get the grouped observables one after another, their emissions somehow alternate. How does this happen?

+7
rx-java
source share
1 answer

Why are the original elements still emitted in the original sequence (i.e. 0, 1, 2, 3, ...), and not 0, 3, 6, 9 ... for the key 0, and then 1, 4, 7 for the key 1, then 2, 5, 8 for key 2?

You answered your question. You work with the flow of objects in the order in which they are emitted. Since each of them is emitted, it is transmitted along a chain of operators, and you see the result that you showed here.

The alternative output you expect from it requires the chain to wait until the source stops issuing items for all groups. Let's say you had Observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0) . Then you expect (0, 3, 0), (1, 4, 4, 4, 4, 4, 4), (2) as output groups. What if you had an endless stream of 4? Your subscriber will never receive this 0, 3 .. from the first group.

You can create the behavior you are looking for. The toList will cache the output until the source completes, and then pass List<R> subscriber:

 .subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.toList().subscribe(items -> { // items is a List<Integer> System.out.println("key " + observable.getKey() + ", items " + items); }); }); 
+4
source share

All Articles