I have a stream of JSON objects that Im manipulating a hash of multiple values. Im hoping to count with a key at n-second (10? 60?) Intervals and use these values ββto do some pattern analysis.
My topology: K->aggregateByKey(n seconds)->process()
In the process - init() step, Ive is called ProcessorContent.schedule(60 * 1000L) in the hope of receiving a .punctuate() call. From here I will iterate over the values ββin the internal hash and act accordingly.
Im values ββare seen at the aggregation stage and fall into the process() function, but .punctuate() will never be called.
code:
KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC); KStream<String, String> mapped = opxLines.map(new ReMapper()); KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey( new AggregateInit(), new OpxAggregate(), TimeWindows.of("opx_aggregate", 60000)); ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() { @Override public Processor<Windowed<String>, String> get() { return new AggProcessor(); } }); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start();
AggregateInit () returns null.
I think I can make the .punctuate() equivalent a simple timer, but I would like to know why this code does not work the way I hope.
source share