After much research on this topic, this is what I found ...
OnQueryProgress falls between requests. I'm not sure if this is intentional functionality or not, but while we are transferring data from a file, OnQueryProgress does not work.
The solution I found was to rely on the foreach writer receiver and do my own performance analysis as part of the process function. Unfortunately, we cannot access specific information about the running request. Or, I have not figured out how to do this yet. This is what I implemented in my sandbox for performance analysis:
val writer = new ForeachWriter[rawDataRow] { def open(partitionId: Long, version: Long):Boolean = { //We end up here in between files true } def process(value: rawDataRow) = { counter += 1 if(counter % 1000 == 0) { val currentTime = System.nanoTime() val elapsedTime = (currentTime - startTime)/1000000000.0 println(s"Records Written: $counter") println(s"Time Elapsed: $elapsedTime seconds") } } }
An alternative way to obtain indicators:
Another way to get information about running requests is to click on the GET endpoint that provides us.
http: // localhost: 4040 / metrics
or
http: // localhost: 4040 / api / v1 /
Documentation here: http://spark.apache.org/docs/latest/monitoring.html
September 2, 2017 Patch Number: Tested with regular sparking, unstructured streaming
Disclaimer, this may not apply to structured streaming, I need to set up a test bed for confirmation. However, it works with regular spark flow (consumption in Kafka in this example).
I believe that since the release of sparking 2.2, there are new endpoints that can get more performance indicators. It may have existed in previous versions, and I just skipped it, but I wanted to make sure it was documented for anyone looking for this information.
http: // localhost: 4040 / api / v1 / applications / {applicationIdHere} / streaming / statistics
This is the endpoint that looks like it was added in 2.2 (or it already existed and the documentation was just added, I'm not sure, I did not check).
In any case, it adds metrics in this format for the specified streaming application:
{ "startTime" : "2017-09-13T14:02:28.883GMT", "batchDuration" : 1000, "numReceivers" : 0, "numActiveReceivers" : 0, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 90379, "numRetainedCompletedBatches" : 1000, "numActiveBatches" : 0, "numProcessedRecords" : 39652167, "numReceivedRecords" : 39652167, "avgInputRate" : 771.722, "avgSchedulingDelay" : 2, "avgProcessingTime" : 85, "avgTotalDelay" : 87 }
This enables us to create our own custom evaluation / monitoring applications using the REST endpoints that Spark sets up.