Sparks of several contexts

In short :

EC2 cluster: 1 lead 3 slaves

Spark Version: 1.3.1

I want to use the spark.driver.allowMultipleContexts option, one local context (master only) and one cluster (master and slave).

I get this stacktrace error (line 29 is where I call the object that initializes the second intrinsically safe text):

fr.entry.Main.main(Main.scala) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795) at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847) at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754) at fr.entry.cluster$.<init>(Main.scala:79) at fr.entry.cluster$.<clinit>(Main.scala) at fr.entry.Main$delayedInit$body.apply(Main.scala:29) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at fr.entry.Main$.main(Main.scala:14) at fr.entry.Main.main(Main.scala) 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING 15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29 15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false) 15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29) 15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List() 15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List() 15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents 15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879 15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB) 15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879 15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB) 15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB) 15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING 15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29) 15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 

More details:

I would like to run one program that performs two functions. Firstly, I have sparkContext local (wizard only), I am doing RDD and doing some operations. Secondly, I have a second sparkContext initialization using master and 3 slaves, which also do RDD and perform some operations. Therefore, in the first case, I want to use 16 cores of the master device, and in the second case, I want to use 8cores x 3 slaves.

A simple example:

 val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8)) println(local.sparkContext.makeRDD(arr).count()) println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum) 

My two SparkContexts:

 object local { val project = "test" val version = "1.0" val sc = new SparkConf() .setMaster("local[16]") .setAppName("Local") .set("spark.local.dir", "/mnt") .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar")) .setSparkHome("/root/spark") .set("spark.driver.allowMultipleContexts", "true") .set("spark.executor.memory", "45g") val sparkContext = new SparkContext(sc) } object cluster { val project = "test" val version = "1.0" val sc = new SparkConf() .setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com .setAppName("Cluster") .set("spark.local.dir", "/mnt") .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars) .setSparkHome("/root/spark") .set("spark.driver.allowMultipleContexts", "true") .set("spark.executor.memory", "35g") val sparkContext = new SparkContext(sc) } 

How can i fix this?

+8
scala apache-spark
source share
4 answers

Although the spark.driver.allowMultipleContexts configuration option exists, it is misleading since using multiple Spark contexts is not recommended. This option is used only for internal Spark tests and should not be used in user programs. You may get unexpected results when you run more than one Spark context in one JVM.

+10
source share

If coordination between the two programs is required, then it would be better to make it part of one Spark application in order to take advantage of Sparks internal optimization and avoid unnecessary I / O.

Secondly, if 2 applications do not need any coordination, you can run 2 separate applications. Since you are using Amazon EC2 / EMR, you can use YARN as a resource manager without significant time investment, as described here .

+1
source share

If you need to work with many Spark contexts, you can enable the special option [MultipleContexts] (1), but it is used only for internal Spark tests and should not be used in user programs. You will get unexpected behavior when you run more than one Spark context in one JVM [SPARK-2243] (2). Nevertheless, it is possible to create different contexts in separate JVMs and manage contexts at the SparkConf level that will optimally correspond to the tasks being performed.

It looks like this: Mist creates every new Sparkcontext in its own JVM.

Spark has middleware - [Mist] . It manages Spark contexts and several JVMs, so you can have different tasks, such as an ETL pipeline, a fast predictable task, an ad hoc request, and a Spark streaming application running in parallel in one cluster.

1> github.com/apache/spark/blob/master/core/src/test/ scala / org / apache / spark / SparkContextSuite.scala # L67

2> issues.apache.org/jira/browse/SPARK-2243

+1
source share

Java:

  .set("spark.driver.allowMultipleContexts", "true") 

+

 sparkContext.cancelAllJobs(); sparkContext.stop(); 

This works for me.

0
source share

All Articles