How does Structured Streaming execute individual streaming requests (in parallel or sequentially)?

I am writing a test application that consumes messages from Kafka topcis, and then transfers the data to S3 and to RDBMS tables (the stream is similar to that presented here: https://databricks.com/blog/2017/04/26/processing-data-in -apache-kafka-with-structured-streaming-in-apache-spark-2-2.html ). So I read the data from Kafka, and then:

  • wants to save every message in S3
  • some messages are stored in table A in an external database (based on a filter condition)
  • some other messages are stored in table B in an external database (another filter condition)

So I like:

Dataset<Row> df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2,topic3") .option("startingOffsets", "earliest") .load() .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value")) 

(note that I read more than one Kafka topic). Then I define the required data sets:

 Dataset<Row> allMessages = df.select(.....) Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements Dataset<Row> messagesOfType2 = df.select() //some other unique conditions 

and now for each data set I create a query to start processing:

 StreamingQuery s3Query = allMessages .writeStream() .format("parquet") .option("startingOffsets", "latest") .option("path", "s3_location") .start() StreamingQuery firstQuery = messagesOfType1 .writeStream() .foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table .start(); StreamingQuery secondQuery = messagesOfType2 .writeStream() .foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before) .start(); 

Now I am wondering:

Will these requests be executed in parallel (or one after the other in FIFO order, and I have to assign these requests to separate the scheduler pools)?

+5
source share
1 answer

Parallel queries will be executed

Yes. These requests will be executed in parallel (each trigger that you did not specify, and therefore should start them as quickly as possible).


Inside, when you execute start on a DataStreamWriter , you create a StreamExecution , which in turn is called daemon microBatchThread (the Spark source code is quoted below):

  val microBatchThread = new StreamExecutionThread(s"stream execution thread for $prettyIdString") { override def run(): Unit = { // To fix call site like "run at <unknown>:0", we bridge the call site from the caller // thread to this micro batch thread sparkSession.sparkContext.setCallSite(callSite) runBatches() } } 

You can see each request in its stream with the name:

 stream execution thread for [prettyIdString] 

You can check individual threads using jstack or jconsole .

+4
source

All Articles