It depends on how you identify the beginning and end of each buffer, so the following RxJava 2 code is intended as a hint about using the main source value to open and close the buffer shutter:
TestScheduler scheduler = new TestScheduler(); PublishProcessor<String> pp = PublishProcessor.create(); Function<Flowable<String>, Flowable<List<String>>> f = o -> o.buffer(o.filter(v -> v.contains("Start")), v -> Flowable.merge(o.filter(w -> w.contains("End")), Flowable.timer(5, TimeUnit.MINUTES, scheduler))); pp.publish(f) .subscribe(System.out::println); pp.onNext("Start"); pp.onNext("A"); pp.onNext("B"); pp.onNext("End"); pp.onNext("Start"); pp.onNext("C"); scheduler.advanceTimeBy(5, TimeUnit.MINUTES); pp.onNext("Start"); pp.onNext("D"); pp.onNext("End"); pp.onComplete();
Print
[Start, A, B, End] [Start, C] [Start, D, End]
It works by sharing the source through publish , which allows you to reuse the same value from the upstream without running multiple source copies at the same time. Opening is controlled by the detection of the Start line on the line. Closing is regulated either by detecting the line βEndβ or by a timer after the grace period.
Edit:
If "Start" is also an indicator of the next batch, you can replace "End" with "Start" and change the contents of the buffer, since it will include a new header in the previous buffer, otherwise:
pp.publish(f) .doOnNext(v -> { int s = v.size(); if (s > 1 && v.get(s - 1).contains("Start")) { v.remove(s - 1); } }) .subscribe(System.out::println);
akarnokd
source share