This is not so simple, because only individual brokers know what is the latest offset information for this section / section.
You can do OffsetRequest . The following will return the earliest and latest offsets for the section / section (this is Scala, but you should get the idea if you are not using Scala).
Please note that you must use SimpleConsumer connected to the broker, which is the leader for the requested section. Usually I do, I create SimpleConsumer for each of my brokers. Then I make a request for metadata and get a section to map the leader, and then to split the section I do this:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = { val time = kafka.api.OffsetRequest.LatestTime val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))) val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq") val resp = consumer.getOffsetsBefore(req) val offsets = resp.offsets(topic, partition) if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0)) else (0, -1) }
Hope this helps.
source share