Kafka KStream - Using AbstractProcessor with a window

Im hoping to combine the windowed batches of output from KStream and record them in a secondary store.

I expected to see .punctuate() called approximately every 30 seconds. Instead, I got here .

(The source file is several thousand lines long)

Summary - .punctuate() receives a call as it would seem randomly, and then repeatedly. It does not seem to adhere to the value set via ProcessorContext.schedule () .


Edit:

Another run of the same code caused .punctuate() calls approximately every four minutes. This time I did not see the crazy duplicate values. No change in source is just a great result.

Using the following code:

the main

 StreamsConfig streamsConfig = new StreamsConfig(config); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> lines = kStreamBuilder.stream(TOPIC); lines.process(new BPS2()); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 

CPU

 public class BP2 extends AbstractProcessor<String, String> { private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class); private ProcessorContext context; private final long delay; private final ArrayList<String> values; public BP2(long delay) { LOGGER.debug("BatchProcessor() constructor"); this.delay = delay; values = new ArrayList<>(); } @Override public void process(String s, String s2) { LOGGER.debug("batched processor s:{} s2:{}", s, s2); values.add(s2); } @Override public void init(ProcessorContext context) { LOGGER.info("init"); super.init(context); values.clear(); this.context = context; context.schedule(delay); } @Override public void punctuate(long timestamp) { super.punctuate(timestamp); LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size()); context().commit(); } } 

ProcessorSupplier

 public class BPS2 implements ProcessorSupplier<String, String> { private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class); @Override public Processor<String, String> get() { try { return new BP2(30000); } catch(Exception exception) { LOGGER.error("Unable to instantiate BatchProcessor()", exception); throw new RuntimeException(); } } } 

Edit:

To prevent my debugger from slowing it down, I built it and ran it in the same field as my kafka process. This time, he did not even try to lag for 4 minutes or more - after a few seconds he issued false calls .punctuate() . Many (most) of them without intermediate calls .process() .

+1
source share
3 answers

Update: this part of the answer is for Kafka version 0.11 or earlier (for Kafka 1.0 and later see below)

In Kafka streams, punctuation is based on the system time of the stream time and not (aka processing time).

By default , the event-time stream time , i.e. timestamp embedded in Kafka records. Since you are not setting a non-standard TimestampExtractor (see timestamp.extractor at http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters ), punctuate calls depend only on the event time process in regarding the records you process. Thus, if you need a few minutes to process the “30 seconds” (event time) of records, punctuate will be called less often than 30 seconds (wall clock time) ...

It may also explain your irregular calling patterns (i.e. explosion and long delays). If your data event time is “jumping” and your data to be processed is already fully accessible in your topic, Kafka threads also “skip” with respect to the internal time stream .

I would suggest that you can solve your problem using WallclockTimestampExtractor (see http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor )

One more note: stream-time is only advanced if the data is being processed - if your application reaches the end of the input topics and is waiting for data, punctuate will not be called. This applies even if you use WallclockTimestampExtractor .

Btw: The behavior of stream punctuation marks is currently being discussed: https://github.com/apache/kafka/pull/1689

Responsible for Kafka 1.0 and later

Since Kafka 1.0 you can register punctuation based on wall clock or event time: https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

+4
source

I just finished reading the answer to this question , which, I think, also answers your questions. The bottom line is:

  • Thread user polls records
  • All returned records are fully processed.
  • Then, a dashed callback is performed with the specified delay.

The point at which punctuation is not a fixed event in the time interval, and variations in how long No. 2 lasts, will lead to equivalent changes in the punctuation period.

.... but read this link, he says it is better than me.

0
source

Good - I think this is a mistake in Kafka.

That's why:

In my initial testing, I used one machine to run the Producer and Consumer . I ran Producer in a few minutes to generate some test data and then run my tests. This would give a strange result, which I published initially.

Then I decided to push "Producer" into the background and leave it in working condition. Now I see 100% perfect 30 second intervals between .punctuate() calls. No more problems with that.

In other words, if the kafka server does not process any incoming data, then it does not seem to be consistent with the operation of KStreams processes.

0
source

All Articles