How to create an InputDStream with offsets in PySpark (using KafkaUtils.createDirectStream)?

How to use KafkaUtils.createDirectStream with offsets for a specific Topic in Pyspark?

+7
apache-spark pyspark apache-kafka
source share
2 answers

If you want to create RDD from the entries in the Kafka theme, use a static set of tuples.

Make available all imported

 from pyspark.streaming.kafka import KafkaUtils, OffsetRange 

Then you create the Kafka Brokers dictionary

 kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} 

Then you create an offset object

 start = 0 until = 10 partition = 0 topic = 'topic' offset = OffsetRange(topic,partition,start,until) offsets = [offset] 

Finally, you create the RDD:

 kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets) 

To create a Stream with offsets, you need to do the following:

 from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition from pyspark.streaming import StreamingContext 

Then you create your sparking context using your sparkcontext

 ssc = StreamingContext(sc, 1) 

Then we will configure all our parameters.

  kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} start = 0 partition = 0 topic = 'topic' 

Then we create our ofOffset dictionary

 topicPartion = TopicAndPartition(topic,partition) fromOffset = {topicPartion: long(start)} //notice that we must cast the int to long 

Finally, we create Stream

 directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, fromOffsets=fromOffset) 
+7
source share

You can do:

 from pyspark.streaming.kafka import TopicAndPartition topic = "test" brokers = "localhost:9092" partition = 0 start = 0 topicpartion = TopicAndPartition(topic, partition) fromoffset = {topicpartion: int(start)} kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \ {"metadata.broker.list": brokers}, fromOffsets = fromoffset) 

Note: Spark 2.2.0, python 3.6

+1
source share

All Articles