RxJava: how to group an arbitrary large number of elements using my own border function

I have an observable that emits strings, and I want to group them by the first character. It is easy to do this with groupBy as follows:

 Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() { public Character call(String row) { return row.charAt(0); } }).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() { public Observable<List<String>> call(GroupedObservable<Character, String> group) { return group.toList(); } }); groupedRows.toBlocking().forEach(new Action1<List<String>>() { public void call(List<String> group) { System.out.println(group); } }); // Output: // [aa, ab, ac] // [bb, bc] // [cc] 

But this is not good for my purposes, because groupBy terminates each group only when the source of the observable emits onComplete . Therefore, if I have many lines, they will be completely collected in memory and only in the last line will "redden" and be written for output.

I need something like a buffer statement, but with my own function that denotes the border of each group. I implemented it like this (knowing that strings are always alphabetically ordered):

 Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); ConnectableObservable<String> connectableRows = rows.publish(); Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() { private char lastChar = 0; public Boolean call(String row) { char currentChar = row.charAt(0); boolean isNewGroup = lastChar != 0 && (currentChar != lastChar); lastChar = currentChar; return isNewGroup; } }); Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector); connectableRows.connect(); groupedRows.toBlocking().forEach(new Action1<List<String>>() { public void call(List<String> group) { System.out.println(group); } }); // Output: // [] // [] // [] 

This does not work because the boundarySelector “has” rows, and I think it is strange because I specifically used ConnectableObservable to indicate that I need two subscribers ( boundarySelector and groupedRows ) before the rows start to emit.

Curiously, if I delay rows for 1 second, then this code will work.

So the question is: how can I group an arbitrary number of rows using my own boundary function?

+5
source share
2 answers
 Observable<Integer> source = Observable.range(0, 100); source .groupBy(k -> k / 10) .publish(groups -> groups .map(g -> Pair.of(g.getKey(), g.takeUntil(groups))) .flatMap(kv -> kv.second .doOnNext(v -> System.out.println(kv.first + " value " + v)) .doOnCompleted(() -> System.out.println(kv.first + " done")) )) .subscribe() ; 
+1
source

Found a way to do this with buffer :

 Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); ConnectableObservable<String> connectableRows = rows.publish(); Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() { private char lastChar = 0; public Boolean call(String row) { char currentChar = row.charAt(0); boolean isNewGroup = lastChar != 0 && (currentChar != lastChar); lastChar = currentChar; return isNewGroup; } }); Observable<List<String>> groupedRows = connectableRows .refCount() .buffer(boundarySelector); groupedRows.toBlocking().forEach(new Action1<List<String>>() { public void call(List<String> group) { System.out.println(group); } }); // Output: // [aa, ab, ac] // [bb, bc] // [cc] 
0
source

All Articles