Does foreachRDD run on Driver?

I am trying to process some XML data received in a JMS queue (QPID) using Spark Streaming. Having received xml as a DStream, I convert them to Dataframes, so I can join them with some of my static data in the form of already loaded Dataframes. But according to the API documentation for the foreachRdd method in DStream: it runs on Driver, so this means that all processing logic will work only on Driver and not apply to workers / executors.

API Documentation

foreachRDD(func)

The most common output statement that applies functions, func, to every RDD generated from a stream. This function should push the data in each RDD to an external system, for example, save RDD to files or write it over the network to the database. Note that the func function is executed in the driver process that executes and will usually have RDD actions that will force stream RDDs to be computed.

+5
source share
2 answers

Does this mean that all the processing logic will work only on Driver, and not distributed among workers / executors.

No, the function itself runs on the driver, but do not forget that it works with RDD . The internal functions that you will use in RDD , for example, foreachPartition , map , filter , etc., will be executed on the work nodes. This one will not force all data to be sent over the network to the driver unless you call methods such as collect that do.

+6
source

To make this clear, if you run the following, you will see a “monkey” on the stdout driver:

 myDStream.foreachRDD { rdd => println("monkey") } 

If you run the following, you will see a “monkey” on the stdout driver, and a work filter will be executed for all artists that rdd distributed by:

 myDStream.foreachRDD { rdd => println("monkey") rdd.filter(element => element == "Save me!") } 

Add the simplification that myDStream only ever receives one RDD, and this RDD extends over the set of sections that we will call PartitionSetA that exist on MachineSetB where ExecutorSetC works. If you run the following, you will see a “monkey” in the stdout driver, you will see a “tortoise” on the output of all artists in the ExecutorSetC (a “tortoise” will appear once for each section - many sections may include the machine on which the executor is running), and both filter and add operations will be performed through ExecutorSetC :

 myDStream.foreachRDD { rdd => println("monkey") rdd.filter(element => element == "Save me!") rdd.foreachPartition { partition => println("turtle") val x = 1 + 1 } } 

Another thing to note is that in the following code, y will be transmitted over the network from the driver to all ExecutorSetC for each rdd :

 val y = 2 myDStream.foreachRDD { rdd => println("monkey") rdd.filter(element => element == "Save me!") rdd.foreachPartition { partition => println("turtle") val x = 1 + 1 val z = x + y } } 

To avoid this overhead, you can use broadcast variables that pass the value from the driver to the executors only once. For instance:

 val y = 2 val broadcastY = sc.broadcast(y) myDStream.foreachRDD { rdd => println("monkey") rdd.filter(element => element == "Save me!") rdd.foreachPartition { partition => println("turtle") val x = 1 + 1 val z = x + broadcastY.value } } 

To send more complex things in the form of broadcast variables, such as objects that cannot be easily serialized after instantiation, you can see the following blog post: https://allegro.tech/2015/08/spark-kafka-integration. html

+1
source

All Articles