Flow control

Is there a way to control the input and output bandwidth of a Spark cluster to make sure that the cluster is not flooded or overflowing with incoming data?

In my case, I configured the Spark cluster on AWS EC2, so I am thinking of using AWS CloudWatch to monitor NetworkIn and NetworkOut . strong> for each node in the cluster.

But my idea seems inaccurate, and the network does not mean that the input data only for Spark, other data may also be calculated.

Is there any tool or way to specifically monitor the status of Spark streaming data ? Or is there already a built-in tool in Spark that I missed?


update: Fixed 1.4 , monitoring on port 4040 is significantly improved by graphic display

+5
source share
1 answer

Spark has a custom metric subsystem . By default, it publishes a JSON version of registered metrics on <driver>:<port>/metrics/json . Other metrics can be configured, such as ganglia, CSV files or JMX.

You will need an external monitoring system that collects indicators on a regular basis and helps you understand it. (nb We use Ganglia, but there are other open source and commercial options there)

Spark Streaming publishes several metrics that you can use to monitor your performance. To calculate the throughput, you must combine:

(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records

For all supported metrics, take a look at StreamingSource

Example. Starting a local REPL using Spark 1.3.1 and after running a trivial streaming application:

 import org.apache.spark.streaming._ val ssc = new StreamingContext(sc, Seconds(10)) val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11) val q = queue.map(elem => sc.parallelize(Seq(elem))) val dstream = ssc.queueStream(q) dstream.print ssc.start 

can get localhost:4040/metrics/json and returns:

 { version: "3.0.0", gauges: { local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: { value: 0 }, local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: { value: 2120 }, local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: { value: 0 }, local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: { value: 2120 }, local-1430558777965.<driver>.DAGScheduler.job.activeJobs: { value: 0 }, local-1430558777965.<driver>.DAGScheduler.job.allJobs: { value: 6 }, local-1430558777965.<driver>.DAGScheduler.stage.failedStages: { value: 0 }, local-1430558777965.<driver>.DAGScheduler.stage.runningStages: { value: 0 }, local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: { value: 44 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: { value: 1430559950044 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: { value: 44 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: { value: 1430559950044 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: { value: 1430559950000 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: { value: 2 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: { value: 2 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: { value: 0 }, local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: { value: 0 } }, counters: { }, histograms: { }, meters: { }, timers: { } } 
+12
source

All Articles