Reactive Flows - Timeout Dosing

I am looking at replacing a home-processing io.projectreactor library that looks awfully close to ReactiveStreams with io.projectreactor . The goal is to reduce the code that we support and use any new features added by the community (operator smoothing).

In the beginning, I have to use stdio and combine multi-line journal entries into text drops that will flow down the pipeline. An example of use is described in detail in the section multiline log entries in Filebeat files (except that we want to process it).

So far I have the code:

 BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); })); Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner()); Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper()); logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length())) .subscribe(); 

This takes care of merging several lines when a new journal header is detected, but in the existing library we also clear the accumulated lines after a timeout (i.e. if the text is not received within 5 seconds, clear the entry).

What would be the correct modeling method in Reactor? Do I need to write my own operator or can I customize any of the existing ones?

Any pointers to relevant examples and documents to achieve this use case in Project Reactor or RxJava will be greatly appreciated.

+8
reactive-programming rx-java project-reactor reactive-streams
source share
2 answers

It depends on how you identify the beginning and end of each buffer, so the following RxJava 2 code is intended as a hint about using the main source value to open and close the buffer shutter:

 TestScheduler scheduler = new TestScheduler(); PublishProcessor<String> pp = PublishProcessor.create(); Function<Flowable<String>, Flowable<List<String>>> f = o -> o.buffer(o.filter(v -> v.contains("Start")), v -> Flowable.merge(o.filter(w -> w.contains("End")), Flowable.timer(5, TimeUnit.MINUTES, scheduler))); pp.publish(f) .subscribe(System.out::println); pp.onNext("Start"); pp.onNext("A"); pp.onNext("B"); pp.onNext("End"); pp.onNext("Start"); pp.onNext("C"); scheduler.advanceTimeBy(5, TimeUnit.MINUTES); pp.onNext("Start"); pp.onNext("D"); pp.onNext("End"); pp.onComplete(); 

Print

 [Start, A, B, End] [Start, C] [Start, D, End] 

It works by sharing the source through publish , which allows you to reuse the same value from the upstream without running multiple source copies at the same time. Opening is controlled by the detection of the Start line on the line. Closing is regulated either by detecting the line β€œEnd” or by a timer after the grace period.

Edit:

If "Start" is also an indicator of the next batch, you can replace "End" with "Start" and change the contents of the buffer, since it will include a new header in the previous buffer, otherwise:

 pp.publish(f) .doOnNext(v -> { int s = v.size(); if (s > 1 && v.get(s - 1).contains("Start")) { v.remove(s - 1); } }) .subscribe(System.out::println); 
+3
source share
Operator

buffer seems to be the most suitable and easiest solution for me.

It has strategies based on size and time. You have a log, so I think you can interpret the lines as the size of the buffer.

Here's an example - how to emit elements grouped at intervals of 4 or 5 seconds:

  Observable<String> lineReader = Observable.<String>create(subscriber -> { try { BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for (String line = br.readLine(); line != null; line = br.readLine()) { subscriber.onNext(line); } } catch (IOException e) { throw new UncheckedIOException(e); } }).subscribeOn(Schedulers.newThread()); lineReader .buffer(5, TimeUnit.SECONDS,4) .filter(lines -> !lines.isEmpty()) .subscribe(System.out::println); 
+1
source share

All Articles