Kafka - problems with TimestampExtractor

I am trying to work with a stream based on a time series that does not seem to start KStream.Process() to start ("punctuation"). (see here for reference)

In the KafkaStreams configuration , I pass this parameter (among other things):

 config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimeExtractor.class.getName()); 

I would expect this to call my object (derived from TimestampExtractor ) when every new record is pulled.

In fact, it is not called at all.

  • Is this the wrong way to set timestamps in a KStream record?
  • Is this the wrong way to declare this configuration?
+5
source share
3 answers

I think this is another case of problems at the broker level. I went and rebuilt the cluster using instances with lots of processor and RAM. Now I get the expected results.

Note for remote observers (observers): if your KStream application behaves strangely, take a look at your brokers and make sure that they are not stuck in the GC and have a lot of β€œmargin” for the file with pens, RAM, etc.

see also

0
source

Nov 2017 update: Kafka streams in Kafka 1.0 now support punctuate() both stream time and processing time (wall clock time). This way you can choose whatever behavior you prefer.

Your setup seems right to me.

What you need to know: with Kafka 0.10.0, the punctuate() method works in a time stream (by default, that is, based on the default time decoder, the time of the stream will mean an event-time). And the flow time only progressed when new data records appeared, and how much the flow time progresses is determined by the corresponding time stamps of these new records.

For instance:

  • Suppose you set punctuate() to call every 1 minute = 60 * 1000 (note: 1 minute of flow time). Now, if this happens when data is not received within the next 5 minutes, punctuate() will not be called at all - even if you can expect it to be called 5 times. What for? Again, since punctuate() depends on the stream time, and the stream time is only advanced based on the newly received data records.

Perhaps this causes the behavior you see?

Looking ahead: the Kafka project has already started discussing how to make punctuate() more flexible, for example. to run it not only based on stream-time (which is event-time by default), but also based on processing-time .

+5
source

Your approach seems right. Compare the pargraph "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters

Not sure why your custom time cutter is not used. Take a look at org.apache.kafka.streams.processor.internals.StreamTask . The constructor should be something like

 TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class); 

Check if your custom extractor is selected there or not ...

+2
source

All Articles