In Kafka, how to get the exact offset in production time

I need to receive a message issued in Kafka hour after hour after day. Every hour I start work to consume a message issued 1 hour ago. for example, if the current time is 20:12, I will use the message between 19:00:00 and 19:59:59. This means that I need to get the initial time offset at 19:00:00 and complete the time offset at 19:59:59. I used SimpleConsumer.getOffsetsBefore, as shown in γ€Œ 0.8.0 SimpleConsumer Example 」. The problem is that the return offset does not match the timestamp specified as the parameter. For example, when you make a time stamp of 19:00:00, I get a message received at 16:38:00.

+6
source share
4 answers

At Kafka there is currently no way to get an offset corresponding to a specific timestamp - this is by design. As described at the top of the Jay Kreps Log Article , the offset number provides a kind of timestamp for the magazine, which is separated from the wall clock time. Using bias, as your concept of time, you can know if any two systems are in a consistent state, just buy, knowing what bias they read before. There is never any confusion regarding different clocks on different servers, leap years, daytime, time zones, etc. It is very cute...

NOW ... all that said, if you know that your server went down at some time X, then, in fact, you really would like to know the corresponding offset. You can get closer. The log files on kafka machines are named according to the time they started writing, and there is a kafka tool (which I can’t find right now) so that you know what offsets are associated with these files. If you want to know the exact timestamp, then you must encode the timestamp in the messages you send to Kafka.

+4
source

Below, for the convenience of using the kafka api method getOffsetsByTimes() it is available from version 0.10.0 or higher. See JavaDoc .

 /** * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. * * This is a blocking call. The consumer does not have to be assigned the partitions. * If the message format version in a partition is before 0.10.0, ie the messages do not have timestamps, null * will be returned for that partition. * * Notice that this method may block indefinitely if the partition does not exist. * * @param timestampsToSearch the mapping from partition to the timestamp to look up. * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. * @throws IllegalArgumentException if the target timestamp is negative. */ @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { // we explicitly exclude the earliest and latest offset here so the timestamp in the returned // OffsetAndTimestamp is always positive. if (entry.getValue() < 0) throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative."); } return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs); } 
+5
source

As other answers note, older versions of Kafka had an approximate way of comparing time with offsets. However, since Kafka 0.10.0 (released in May 2016), Kafka maintains a time index for each topic. This will allow you to efficiently receive from time to precise offsets. You can use the KafkaConsumer # offsetsForTimes method to access this information.

More information on how time is indexed on the KIP-33 Design Discussion Page .

+3
source

Kafka 1.10 supports timestamps, although it will still be a little difficult to use it to do what you want to do. But if you know, but from what timestamp you want to read, and until you want to read it, you can simply poll messages until this time and stop consuming.

0
source

All Articles