I have an observable that emits strings, and I want to group them by the first character. It is easy to do this with groupBy as follows:
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() { public Character call(String row) { return row.charAt(0); } }).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() { public Observable<List<String>> call(GroupedObservable<Character, String> group) { return group.toList(); } }); groupedRows.toBlocking().forEach(new Action1<List<String>>() { public void call(List<String> group) { System.out.println(group); } });
But this is not good for my purposes, because groupBy terminates each group only when the source of the observable emits onComplete . Therefore, if I have many lines, they will be completely collected in memory and only in the last line will "redden" and be written for output.
I need something like a buffer statement, but with my own function that denotes the border of each group. I implemented it like this (knowing that strings are always alphabetically ordered):
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); ConnectableObservable<String> connectableRows = rows.publish(); Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() { private char lastChar = 0; public Boolean call(String row) { char currentChar = row.charAt(0); boolean isNewGroup = lastChar != 0 && (currentChar != lastChar); lastChar = currentChar; return isNewGroup; } }); Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector); connectableRows.connect(); groupedRows.toBlocking().forEach(new Action1<List<String>>() { public void call(List<String> group) { System.out.println(group); } });
This does not work because the boundarySelector “has” rows, and I think it is strange because I specifically used ConnectableObservable to indicate that I need two subscribers ( boundarySelector and groupedRows ) before the rows start to emit.
Curiously, if I delay rows for 1 second, then this code will work.
So the question is: how can I group an arbitrary number of rows using my own boundary function?