Spark has a custom metric subsystem . By default, it publishes a JSON version of registered metrics on <driver>:<port>/metrics/json . Other metrics can be configured, such as ganglia, CSV files or JMX.
You will need an external monitoring system that collects indicators on a regular basis and helps you understand it. (nb We use Ganglia, but there are other open source and commercial options there)
Spark Streaming publishes several metrics that you can use to monitor your performance. To calculate the throughput, you must combine:
(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records
For all supported metrics, take a look at StreamingSource
Example. Starting a local REPL using Spark 1.3.1 and after running a trivial streaming application:
import org.apache.spark.streaming._ val ssc = new StreamingContext(sc, Seconds(10)) val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11) val q = queue.map(elem => sc.parallelize(Seq(elem))) val dstream = ssc.queueStream(q) dstream.print ssc.start
can get localhost:4040/metrics/json and returns:
{ version: "3.0.0", gauges: { local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: { value: 0 }, local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: { value: 2120 }, local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: { value: 0 }, local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: { value: 2120 }, local-1430558777965.<driver>.DAGScheduler.job.activeJobs: { value: 0 }, local-1430558777965.<driver>.DAGScheduler.job.allJobs: { value: 6 }, local-1430558777965.<driver>.DAGScheduler.stage.failedStages: { value: 0 }, local-1430558777965.<driver>.DAGScheduler.stage.runningStages: { value: 0 }, local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: { value: 44 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: { value: 1430559950044 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: { value: 44 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: { value: 1430559950044 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: { value: 2 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: { value: 2 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: { value: 0 } }, counters: { }, histograms: { }, meters: { }, timers: { } }