If you want to create RDD from the entries in the Kafka theme, use a static set of tuples.
Make available all imported
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
Then you create the Kafka Brokers dictionary
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
Then you create an offset object
start = 0 until = 10 partition = 0 topic = 'topic' offset = OffsetRange(topic,partition,start,until) offsets = [offset]
Finally, you create the RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
To create a Stream with offsets, you need to do the following:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition from pyspark.streaming import StreamingContext
Then you create your sparking context using your sparkcontext
ssc = StreamingContext(sc, 1)
Then we will configure all our parameters.
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} start = 0 partition = 0 topic = 'topic'
Then we create our ofOffset dictionary
topicPartion = TopicAndPartition(topic,partition) fromOffset = {topicPartion: long(start)}
Finally, we create Stream
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, fromOffsets=fromOffset)