The following code:
Observable .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) .doOnNext(item -> System.out.println("source emitting " + item)) .groupBy(item -> { System.out.println("groupBy called for " + item); return item % 3; }) .subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.subscribe(item -> { System.out.println("key " + observable.getKey() + ", item " + item); }); });
leaves me puzzled. The output I get is:
source emitting 0 groupBy called for 0 got observable rx.observables.GroupedObservable@42110406 for key 0 key 0, item 0 source emitting 1 groupBy called for 1 got observable rx.observables.GroupedObserva ble@1698c449 for key 1 key 1, item 1 source emitting 2 groupBy called for 2 got observable rx.observables.GroupedObservable@5ef04b5 for key 2 key 2, item 2 source emitting 3 groupBy called for 3 key 0, item 3 source emitting 4 groupBy called for 4 key 1, item 4 source emitting 5 groupBy called for 5 key 2, item 5 source emitting 6 groupBy called for 6 key 0, item 6 source emitting 7 groupBy called for 7 key 1, item 7 source emitting 8 groupBy called for 8 key 2, item 8 source emitting 9 groupBy called for 9 key 0, item 9
So, in the top-level subscription method, I get 3 observables from GroupedObservable, as expected. Then, one by one, I subscribe to grouped observables - and here's what I don't understand:
Why are the original elements still emitted in the original sequence (i.e. 0, 1, 2, 3, ...), and not 0, 3, 6, 9 ... for the key 0, and then 1, 4, 7 for the key 1, then 2, 5, 8 for key 2?
I think I understand how groups are created:
1. 0 is emitted, the key function is called and it gets 0 2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0 3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly 4. source item 3 is emitted, the key function is called and it gets 0 5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable 6. etc. until the source sequence is drained
It seems that although I get the grouped observables one after another, their emissions somehow alternate. How does this happen?