I have an RX manufacturer that creates a stream of such strings (a simplified version of a real stream):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
The flow is endless, but streamlined. So, after the lines ending with A have ended, B begins. When B ends, C begins ... when Z ends, we go to AA1 , etc. There is an unknown number A 's, B , etc., but usually 10-30 copies per letter.
I am looking for a way to divide this stream into blocks of all A: A1 A2 A3 , all B: B1 B2 , all C: C1 C2 C3 C4 C5 C6 , etc. Each block can be either observable (which I'll go to the list) or just a list.
I tried several different approaches using RxJava, all of which failed. Among the things that didn't work:
Group : since the flow is infinite, the observed letter does not end. Therefore, when A ends and B begins, A Observable does not end. Thus, an increasing number of observables.
Window / Buffer with distinctUntilChanged . I use "distinctUntilChanged" in the source stream to output the first element of each group (first A, first B, etc.). Then I use this stream as input for the window or the "buffer" operator, which will be used as the border between windows / buffers. It didn’t work, and all I had was an empty list.
What is the right solution using RX? I would prefer a Java solution, but solutions in other RX implementations that can be easily converted to Java are also welcome.
java c # reactive-programming rx-java system.reactive
Malt
source share