Kafka KStream - topology for n-second counting

I have a stream of JSON objects that Im manipulating a hash of multiple values. Im hoping to count with a key at n-second (10? 60?) Intervals and use these values ​​to do some pattern analysis.

My topology: K->aggregateByKey(n seconds)->process()

In the process - init() step, Ive is called ProcessorContent.schedule(60 * 1000L) in the hope of receiving a .punctuate() call. From here I will iterate over the values ​​in the internal hash and act accordingly.

Im values ​​are seen at the aggregation stage and fall into the process() function, but .punctuate() will never be called.


code:

 KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC); KStream<String, String> mapped = opxLines.map(new ReMapper()); KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey( new AggregateInit(), new OpxAggregate(), TimeWindows.of("opx_aggregate", 60000)); ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() { @Override public Processor<Windowed<String>, String> get() { return new AggProcessor(); } }); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 

AggregateInit () returns null.

I think I can make the .punctuate() equivalent a simple timer, but I would like to know why this code does not work the way I hope.

+1
source share
1 answer

I think this is due to the incorrect configuration of the kafka cluster. After changing the number of file descriptors to a much higher value than the default value (1024 β†’ 65535), this seems to work by specification.

0
source

All Articles