Rx - Divide the stream into segments (lists) by condition

I have an RX manufacturer that creates a stream of such strings (a simplified version of a real stream):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

The flow is endless, but streamlined. So, after the lines ending with A have ended, B begins. When B ends, C begins ... when Z ends, we go to AA1 , etc. There is an unknown number A 's, B , etc., but usually 10-30 copies per letter.

I am looking for a way to divide this stream into blocks of all A: A1 A2 A3 , all B: B1 B2 , all C: C1 C2 C3 C4 C5 C6 , etc. Each block can be either observable (which I'll go to the list) or just a list.

I tried several different approaches using RxJava, all of which failed. Among the things that didn't work:

  • Group : since the flow is infinite, the observed letter does not end. Therefore, when A ends and B begins, A Observable does not end. Thus, an increasing number of observables.

  • Window / Buffer with distinctUntilChanged . I use "distinctUntilChanged" in the source stream to output the first element of each group (first A, first B, etc.). Then I use this stream as input for the window or the "buffer" operator, which will be used as the border between windows / buffers. It didn’t work, and all I had was an empty list.

What is the right solution using RX? I would prefer a Java solution, but solutions in other RX implementations that can be easily converted to Java are also welcome.

+8
java c # reactive-programming rx-java system.reactive
source share
4 answers

Here is how I can solve this:

 Observable<String> source = Observable.from( "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1"); Observable<List<String>> output = Observable.defer(() -> { List<String> buffer = new ArrayList<>(); return Observable.concat( source.concatMap(new Function<String, Observable<List<String>>>() { String lastKey; @Override public Observable<List<String>> apply(String t) { String key = t.substring(0, 1); if (lastKey != null && !key.equals(lastKey)) { List<String> b = new ArrayList<>(buffer); buffer.clear(); buffer.add(t); lastKey = key; return Observable.just(b); } lastKey = key; buffer.add(t); return Observable.empty(); } }), Observable.just(1) .flatMap(v -> { if (buffer.isEmpty()) { return Observable.empty(); } return Observable.just(buffer); }) ); } ); output.subscribe(System.out::println); 

Here's how it works:

  • I use defer because we need a buffer for each subscriber, not global
  • I combine buffering with emitting the last buffer if the source turns out to be final
  • I use concatMap and add to the buffer until the key changes, until I emit empty Observables. Once the key has changed, I emit the contents of the buffer and start a new one.
+3
source share

You can use rxjava-extras .toListWhile :

 Observable<String> source = Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1"); source.compose(Transformers.<String> toListWhile( (list, t) -> list.isEmpty() || list.get(0).charAt(0) == t.charAt(0))) .forEach(System.out::println); 

It does what @akarnokd has done under the covers and is being tested on the device.

+5
source share

Suppose we have a source string stream and a key function that fetches a key for each string , for example:

 IObservable<string> source = ...; Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray()); 

Then we can use Buffer with a custom close selector.

 var query = source.Publish(o => o.Buffer(() => { var keys = o.Select(key); return Observable .CombineLatest( keys.Take(1), keys.Skip(1), (a, b) => a != b) .Where(x => x); })); 

Each buffer ends when the first element in the buffer and the current element, which we consider when adding to the buffer, do not have the same key.

+1
source share

Looking at akarnokd's and Dave , I came up with my own solution by running a custom Rx statement called BufferWhile . It seems to work just as well as other solutions (someone, please correct me if I am wrong), but this seems more direct:

 public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{ private final Func1<? super T, ? extends U> keyGenerator; public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) { this.keyGenerator = keyGenerator; } @Override public Subscriber<? super T> call(final Subscriber<? super List<T>> s) { return new Subscriber<T>(s) { private ArrayList<T> buffer = null; private U currentKey = null; @Override public void onCompleted() { submitAndClearBuffer(); s.onCompleted(); } @Override public void onError(Throwable e) { submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case s.onError(e); } @Override public void onNext(T t) { if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) { currentKey = keyGenerator.call(t); submitAndClearBuffer(); buffer.add(t); } else { buffer.add(t); request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers } } private void submitAndClearBuffer() { if (buffer != null && buffer.size() > 0) { s.onNext(buffer); } buffer = new ArrayList<>(); } }; } } 

I can apply this operator to the original observable using lift and get an observable that emits lists of strings.

0
source share

All Articles