In short :
EC2 cluster: 1 lead 3 slaves
Spark Version: 1.3.1
I want to use the spark.driver.allowMultipleContexts option, one local context (master only) and one cluster (master and slave).
I get this stacktrace error (line 29 is where I call the object that initializes the second intrinsically safe text):
fr.entry.Main.main(Main.scala) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808) at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795) at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847) at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754) at fr.entry.cluster$.<init>(Main.scala:79) at fr.entry.cluster$.<clinit>(Main.scala) at fr.entry.Main$delayedInit$body.apply(Main.scala:29) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at fr.entry.Main$.main(Main.scala:14) at fr.entry.Main.main(Main.scala) 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING 15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29 15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false) 15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29) 15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List() 15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List() 15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents 15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879 15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB) 15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879 15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB) 15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB) 15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING 15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29) 15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
More details:
I would like to run one program that performs two functions. Firstly, I have sparkContext local (wizard only), I am doing RDD and doing some operations. Secondly, I have a second sparkContext initialization using master and 3 slaves, which also do RDD and perform some operations. Therefore, in the first case, I want to use 16 cores of the master device, and in the second case, I want to use 8cores x 3 slaves.
A simple example:
val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8)) println(local.sparkContext.makeRDD(arr).count()) println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum)
My two SparkContexts:
object local { val project = "test" val version = "1.0" val sc = new SparkConf() .setMaster("local[16]") .setAppName("Local") .set("spark.local.dir", "/mnt") .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar")) .setSparkHome("/root/spark") .set("spark.driver.allowMultipleContexts", "true") .set("spark.executor.memory", "45g") val sparkContext = new SparkContext(sc) } object cluster { val project = "test" val version = "1.0" val sc = new SparkConf() .setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com .setAppName("Cluster") .set("spark.local.dir", "/mnt") .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars) .setSparkHome("/root/spark") .set("spark.driver.allowMultipleContexts", "true") .set("spark.executor.memory", "35g") val sparkContext = new SparkContext(sc) }
How can i fix this?