Can I create an RDD from a kafka theme, if I donโ€™t know, before the offset?

KafkaUtils.createRDD takes an offsetRanges value as a parameter. I do not know until the bias of the topic I want to read. I want to read no more than 30 posts in the topic.

I see that there is KafkaCluster.html # getLatestLeaderOffsets , but this is annotated as a development API.

Is there any public way to determine the earliest and latest offsets for a topic?

+6
source share
1 answer

This is not so simple, because only individual brokers know what is the latest offset information for this section / section.

You can do OffsetRequest . The following will return the earliest and latest offsets for the section / section (this is Scala, but you should get the idea if you are not using Scala).

Please note that you must use SimpleConsumer connected to the broker, which is the leader for the requested section. Usually I do, I create SimpleConsumer for each of my brokers. Then I make a request for metadata and get a section to map the leader, and then to split the section I do this:

 def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = { val time = kafka.api.OffsetRequest.LatestTime val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))) val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq") val resp = consumer.getOffsetsBefore(req) val offsets = resp.offsets(topic, partition) if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0)) else (0, -1) } 

Hope this helps.

+3
source

All Articles