Sending a large CSV to Kafka using python Spark

I am trying to send a big CSV to kafka. The basic structure is to read the CSV line and pin it to the header.

a = dict(zip(header, line.split(",") 

Then it is converted to json with:

 message = json.dumps(a) 

Then I use kafka-python library to send a message

 from kafka import SimpleProducer, KafkaClient kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) producer.send_messages("topic", message) 

Using PYSPARK I easily created RDD messages from a CSV file

 sc = SparkContext() text = sc.textFile("file.csv") header = text.first().split(',') def remove_header(itr_index, itr): return iter(list(itr)[1:]) if itr_index == 0 else itr noHeader = text.mapPartitionsWithIndex(remove_header) messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(",")) 

Now I want to send these messages: I define a function

 def sendkafka(message): kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) return producer.send_messages('topic',message) 

Then I create a new RDD to send messages

 sentRDD = messageRDD.map(lambda x: kafkasend(x)) 

Then I call sentRDD.count ()

What starts to knock down and send messages

Unfortunately, this is very slow. It sends 1000 messages per second. This is a 10 node cluster with 4 processors each and 8 GB of memory.

In comparison, creating messages takes about 7 seconds on a 10 millionth line of csv. ~ about 2gb

I think the problem is that I am creating a kafka creator inside the function. However, if I do not, she complains that the manufacturer does not exist, although I tried to define it globally.

Perhaps someone can shed light on how to approach this problem.

Thanks,

+6
source share
1 answer

You can create one producer per section and use either mapPartitions or foreachPartition :

 def sendkafka(messages): kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) for message in messages: yield producer.send_messages('topic', message) sentRDD = messageRDD.mapPartitions(sendkafka) 

If the above does not help, you can try to expand it using an asynchronous manufacturer .

In Spark 2.x, it is also possible to use the Kafka data source. You must include the spark-sql-kafka jar corresponding to the Spark and Scala versions (here 2.2.0 and 2.11 respectively):

 spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 

convert data to DataFrame (if it is no longer a DataFrame ):

 messageDF = spark.createDataFrame(messageRDD, "string") 

and write using a DataFrameWriter :

 (messageDF.write .format("kafka") .option("topic", topic_name) .option("kafka.bootstrap.servers", bootstrap_servers) .save()) 
+6
source

All Articles