Structured Streaming Monitoring

I have a structured thread that works fine, but I was hoping to control it while it works.

I created an EventCollector

class EventCollector extends StreamingQueryListener{ override def onQueryStarted(event: QueryStartedEvent): Unit = { println("Start") } override def onQueryProgress(event: QueryProgressEvent): Unit = { println(event.queryStatus.prettyJson) } override def onQueryTerminated(event: QueryTerminatedEvent): Unit = { println("Term") } 

I built an EventCollector and added a listener to my spark session

 val listener = new EventCollector() spark.streams.addListener(listener) 

Then I run the query

 val query = inputDF.writeStream //.format("console") .queryName("Stream") .foreach(writer) .start() query.awaitTermination() 

However, onQueryProgress never hits. onQueryStarted does, but I was hoping to get the query flow at a certain interval to control how the queries are executed. Can anyone help with this?

+7
scala apache-spark structured-streaming
source share
1 answer

After much research on this topic, this is what I found ...

OnQueryProgress falls between requests. I'm not sure if this is intentional functionality or not, but while we are transferring data from a file, OnQueryProgress does not work.

The solution I found was to rely on the foreach writer receiver and do my own performance analysis as part of the process function. Unfortunately, we cannot access specific information about the running request. Or, I have not figured out how to do this yet. This is what I implemented in my sandbox for performance analysis:

 val writer = new ForeachWriter[rawDataRow] { def open(partitionId: Long, version: Long):Boolean = { //We end up here in between files true } def process(value: rawDataRow) = { counter += 1 if(counter % 1000 == 0) { val currentTime = System.nanoTime() val elapsedTime = (currentTime - startTime)/1000000000.0 println(s"Records Written: $counter") println(s"Time Elapsed: $elapsedTime seconds") } } } 

An alternative way to obtain indicators:

Another way to get information about running requests is to click on the GET endpoint that provides us.

http: // localhost: 4040 / metrics

or

http: // localhost: 4040 / api / v1 /

Documentation here: http://spark.apache.org/docs/latest/monitoring.html

September 2, 2017 Patch Number: Tested with regular sparking, unstructured streaming

Disclaimer, this may not apply to structured streaming, I need to set up a test bed for confirmation. However, it works with regular spark flow (consumption in Kafka in this example).

I believe that since the release of sparking 2.2, there are new endpoints that can get more performance indicators. It may have existed in previous versions, and I just skipped it, but I wanted to make sure it was documented for anyone looking for this information.

http: // localhost: 4040 / api / v1 / applications / {applicationIdHere} / streaming / statistics

This is the endpoint that looks like it was added in 2.2 (or it already existed and the documentation was just added, I'm not sure, I did not check).

In any case, it adds metrics in this format for the specified streaming application:

 { "startTime" : "2017-09-13T14:02:28.883GMT", "batchDuration" : 1000, "numReceivers" : 0, "numActiveReceivers" : 0, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 90379, "numRetainedCompletedBatches" : 1000, "numActiveBatches" : 0, "numProcessedRecords" : 39652167, "numReceivedRecords" : 39652167, "avgInputRate" : 771.722, "avgSchedulingDelay" : 2, "avgProcessingTime" : 85, "avgTotalDelay" : 87 } 

This enables us to create our own custom evaluation / monitoring applications using the REST endpoints that Spark sets up.

+3
source share

All Articles