Im hoping to combine the windowed batches of output from KStream and record them in a secondary store.
I expected to see .punctuate() called approximately every 30 seconds. Instead, I got here .
(The source file is several thousand lines long)
Summary - .punctuate() receives a call as it would seem randomly, and then repeatedly. It does not seem to adhere to the value set via ProcessorContext.schedule () .
Edit:
Another run of the same code caused .punctuate() calls approximately every four minutes. This time I did not see the crazy duplicate values. No change in source is just a great result.
Using the following code:
the main
StreamsConfig streamsConfig = new StreamsConfig(config); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> lines = kStreamBuilder.stream(TOPIC); lines.process(new BPS2()); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start();
CPU
public class BP2 extends AbstractProcessor<String, String> { private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class); private ProcessorContext context; private final long delay; private final ArrayList<String> values; public BP2(long delay) { LOGGER.debug("BatchProcessor() constructor"); this.delay = delay; values = new ArrayList<>(); } @Override public void process(String s, String s2) { LOGGER.debug("batched processor s:{} s2:{}", s, s2); values.add(s2); } @Override public void init(ProcessorContext context) { LOGGER.info("init"); super.init(context); values.clear(); this.context = context; context.schedule(delay); } @Override public void punctuate(long timestamp) { super.punctuate(timestamp); LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size()); context().commit(); } }
ProcessorSupplier
public class BPS2 implements ProcessorSupplier<String, String> { private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class); @Override public Processor<String, String> get() { try { return new BP2(30000); } catch(Exception exception) { LOGGER.error("Unable to instantiate BatchProcessor()", exception); throw new RuntimeException(); } } }
Edit:
To prevent my debugger from slowing it down, I built it and ran it in the same field as my kafka process. This time, he did not even try to lag for 4 minutes or more - after a few seconds he issued false calls .punctuate() . Many (most) of them without intermediate calls .process() .