I am trying to send a big CSV to kafka. The basic structure is to read the CSV line and pin it to the header.
a = dict(zip(header, line.split(",")
Then it is converted to json with:
message = json.dumps(a)
Then I use kafka-python library to send a message
from kafka import SimpleProducer, KafkaClient kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) producer.send_messages("topic", message)
Using PYSPARK I easily created RDD messages from a CSV file
sc = SparkContext() text = sc.textFile("file.csv") header = text.first().split(',') def remove_header(itr_index, itr): return iter(list(itr)[1:]) if itr_index == 0 else itr noHeader = text.mapPartitionsWithIndex(remove_header) messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
Now I want to send these messages: I define a function
def sendkafka(message): kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) return producer.send_messages('topic',message)
Then I create a new RDD to send messages
sentRDD = messageRDD.map(lambda x: kafkasend(x))
Then I call sentRDD.count ()
What starts to knock down and send messages
Unfortunately, this is very slow. It sends 1000 messages per second. This is a 10 node cluster with 4 processors each and 8 GB of memory.
In comparison, creating messages takes about 7 seconds on a 10 millionth line of csv. ~ about 2gb
I think the problem is that I am creating a kafka creator inside the function. However, if I do not, she complains that the manufacturer does not exist, although I tried to define it globally.
Perhaps someone can shed light on how to approach this problem.
Thanks,