I downloaded Spark 1.1.0 and built it using the "sbt build". I am trying to run an example Spark-Cassandra project :
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import com.datastax.spark.connector._ object Test { def main (args: Array[String]) { val conf = new SparkConf(true) .set("spark.cassandra.connection.host", "127.0.0.1") val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf) val rdd = sc.cassandraTable("test", "kv") println(rdd.count) println(rdd.first) println(rdd.map(_.getInt("value")).sum) } }
The spark wizard starts by starting. / start -master.sh in the sbin directory. Then start the slave on the same computer using the command:
./start-slave.sh 0 Spark: //127.0.0.1: 7077
Where is the spark: // .. address is the one that was selected from localhost: 8080 (intrinsically safe panel).
Everything is in order, the dashboard sees the worker. Then I run the Scala program and get a ClassNotFoundException. This seems to be a bit misleading, as working logs report this error:
14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp:// sparkWorker@localhost :33137] -> [akka.tcp:// sparkExecutor@localhost :37279]: Error [Association failed with [akka.tcp://# akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkExecutor@localhost :37279] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:37279 ]
When I run the sample program, this is a warning:
14/10/29 12:22:31 WARN util.Utils: Your hostname, bas-HP-EliteBook-8530w resolves to a loopback address: 127.0.0.1; using 192.168.122.1 instead (on interface virbr0) 14/10/29 12:22:31 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
I do not know why this warning exists. My hosts file is as follows:
127.0.0.1 localhost 127.0.0.1 bas-HP-EliteBook-8530w # The following lines are desirable for IPv6 capable hosts ::1 ip6-localhost ip6-loopback fe00::0 ip6-localnet ff00::0 ip6-mcastprefix ff02::1 ip6-allnodes ff02::2 ip6-allrouters
conf / spark-env.sh has the following options:
SPARK_LOCAL_IP=127.0.0.1 SPARK_MASTER_IP=127.0.0.1
Full working journal:
Spark Command: java -cp ::/home/bas/Downloads/spark-1.1.0.backup/conf:/home/bas/Downloads/spark-1.1.0.backup/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop1.0.4.jar -XX:MaxPermSize
Full master journal:
park Command: java -cp ::/home/bas/Downloads/spark-1.1.0.backup/conf:/home/bas/Downloads/spark-1.1.0.backup/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop1.0.4.jar -XX:MaxPermSize# ======================================== Using Spark default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/29 12:20:52 INFO Master: Registered signal handlers for [TERM, HUP, INT] 14/10/29 12:20:52 INFO SecurityManager: Changing view acls to: bas, 14/10/29 12:20:52 INFO SecurityManager: Changing modify acls to: bas, 14/10/29 12:20:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(bas, ); users with modify permissions: Set(bas, ) 14/10/29 12:20:53 INFO Slf4jLogger: Slf4jLogger started 14/10/29 12:20:53 INFO Remoting: Starting remoting 14/10/29 12:20:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:// sparkMaster@127.0.0.1 :7077] 14/10/29 12:20:53 INFO Utils: Successfully started service 'sparkMaster' on port 7077. 14/10/29 12:20:54 INFO Master: Starting Spark master at spark://127.0.0.1:7077 14/10/29 12:20:54 INFO Utils: Successfully started service 'MasterUI' on port 8080. 14/10/29 12:20:54 INFO MasterWebUI: Started MasterWebUI at http://localhost:8080 14/10/29 12:20:54 INFO Master: I have been elected leader! New state: ALIVE 14/10/29 12:21:18 INFO Master: Registering worker localhost:33137 with 2 cores, 2.8 GB RAM 14/10/29 12:22:34 INFO Master: Registering app test 14/10/29 12:22:34 INFO Master: Registered app test with ID app-20141029122234-0000 14/10/29 12:22:34 INFO Master: Launching executor app-20141029122234-0000/0 on worker worker-20141029122117-localhost-33137 14/10/29 12:23:05 INFO Master: akka.tcp:// sparkDriver@192.168.122.1 :40211 got disassociated, removing it. 14/10/29 12:23:05 INFO Master: Removing app app-20141029122234-0000 14/10/29 12:23:05 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/sy# 14/10/29 12:23:05 INFO Master: akka.tcp://spark Driver@192.168.122.1 :40211 got disassociated, removing it. 14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp:// sparkMaster@127.0.0.1 :7077] -> [akka.tcp:// sparkDriver@192.168.122.1 :40211]: Error [Association failed with [akka.tcp:/# akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkDriver@192.168.122.1 :40211] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: /192.168.122.1:40211 ] 14/10/29 12:23:05 INFO Master: akka.tcp:// sparkDriver@192.168.122.1 :40211 got disassociated, removing it. 14/10/29 12:23:05 INFO Master: akka.tcp:// sparkDriver@192.168.122.1 :40211 got disassociated, removing it. 14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp:// sparkMaster@127.0.0.1 :7077] -> [akka.tcp:// sparkDriver@192.168.122.1 :40211]: Error [Association failed with [akka.tcp:/# akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkDriver@192.168.122.1 :40211] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: /192.168.122.1:40211 ] 14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp:// sparkMaster@127.0.0.1 :7077] -> [akka.tcp:// sparkDriver@192.168.122.1 :40211]: Error [Association failed with [akka.tcp:/# akka.remote.EndpointAssociationException: Association failed with [akka.tcp:// sparkDriver@192.168.122.1 :40211] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: /192.168.122.1:40211 ] 14/10/29 12:23:05 INFO Master: akka.tcp:// sparkDriver@192.168.122.1 :40211 got disassociated, removing it. 14/10/29 12:23:05 WARN Master: Got status update for unknown executor app-20141029122234-0000/0
IntelliJ stacktrace:
/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain Test 14/10/29 12:22:31 WARN util.Utils: Your hostname, bas-HP-EliteBook-8530w resolves to a loopback address: 127.0.0.1; using 192.168.122.1 instead (on interface virbr0) 14/10/29 12:22:31 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 14/10/29 12:22:31 INFO spark.SecurityManager: Changing view acls to: bas, 14/10/29 12:22:31 INFO spark.SecurityManager: Changing modify acls to: bas, 14/10/29 12:22:31 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(bas, ); users with modify permissions: Set(bas, ) 14/10/29 12:22:31 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/29 12:22:31 INFO Remoting: Starting remoting 14/10/29 12:22:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:// sparkDriver@192.168.122.1 :40211] 14/10/29 12:22:32 INFO Remoting: Remoting now listens on addresses: [akka.tcp:// sparkDriver@192.168.122.1 :40211] 14/10/29 12:22:32 INFO util.Utils: Successfully started service 'sparkDriver' on port 40211. 14/10/29 12:22:32 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/29 12:22:32 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/29 12:22:32 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20141029122232-f3bf 14/10/29 12:22:32 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 34325. 14/10/29 12:22:32 INFO network.ConnectionManager: Bound socket to port 34325 with id = ConnectionManagerId(192.168.122.1,34325) 14/10/29 12:22:32 INFO storage.MemoryStore: MemoryStore started with capacity 470.3 MB 14/10/29 12:22:32 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/29 12:22:32 INFO storage.BlockManagerMasterActor: Registering block manager 192.168.122.1:34325 with 470.3 MB RAM 14/10/29 12:22:32 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/29 12:22:32 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-eea6f1f3-8f69-4900-87c7-6da17f1f3d76 14/10/29 12:22:32 INFO spark.HttpServer: Starting HTTP Server 14/10/29 12:22:32 INFO server.Server: jetty-8.1.14.v20131031 14/10/29 12:22:32 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0 :33688 14/10/29 12:22:32 INFO util.Utils: Successfully started service 'HTTP file server' on port 33688. 14/10/29 12:22:33 INFO server.Server: jetty-8.1.14.v20131031 14/10/29 12:22:33 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0 :4040 14/10/29 12:22:33 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/10/29 12:22:33 INFO ui.SparkUI: Started SparkUI at http://192.168.122.1:4040 14/10/29 12:22:33 INFO client.AppClient$ClientActor: Connecting to master spark://127.0.0.1:7077... 14/10/29 12:22:33 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/10/29 12:22:34 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141029122234-0000 14/10/29 12:22:34 INFO client.AppClient$ClientActor: Executor added: app-20141029122234-0000/0 on worker-20141029122117-localhost-33137 (localhost:33137) with 2 cores 14/10/29 12:22:34 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141029122234-0000/0 on hostPort localhost:33137 with 2 cores, 512.0 MB RAM 14/10/29 12:22:34 INFO client.AppClient$ClientActor: Executor updated: app-20141029122234-0000/0 is now RUNNING 14/10/29 12:22:34 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster 14/10/29 12:22:34 INFO core.Cluster: New Cassandra host /127.0.0.1:9042 added 14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/10/29 12:22:35 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 14/10/29 12:22:35 INFO spark.SparkContext: Starting job: count at Test.scala:23 14/10/29 12:22:35 INFO scheduler.DAGScheduler: Got job 0 (count at Test.scala:23) with 1 output partitions (allowLocal=false) 14/10/29 12:22:35 INFO scheduler.DAGScheduler: Final stage: Stage 0(count at Test.scala:23) 14/10/29 12:22:35 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/10/29 12:22:35 INFO scheduler.DAGScheduler: Missing parents: List() 14/10/29 12:22:35 INFO scheduler.DAGScheduler: Submitting Stage 0 (CassandraRDD[0] at RDD at CassandraRDD.scala:47), which has no missing parents 14/10/29 12:22:36 INFO storage.MemoryStore: ensureFreeSpace(4224) called with curMem=0, maxMem=493187235 14/10/29 12:22:36 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 470.3 MB) 14/10/29 12:22:36 INFO storage.MemoryStore: ensureFreeSpace(2338) called with curMem=4224, maxMem=493187235 14/10/29 12:22:36 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.3 KB, free 470.3 MB) 14/10/29 12:22:36 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.122.1:34325 (size: 2.3 KB, free: 470.3 MB) 14/10/29 12:22:36 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0 14/10/29 12:22:36 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (CassandraRDD[0] at RDD at CassandraRDD.scala:47) 14/10/29 12:22:36 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/10/29 12:22:38 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp:// sparkExecutor@localhost :37279/user/Executor
build.sbt:
name := "sparktest" version := "1.0" libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha3" withSources() withJavadoc() libraryDependencies += "org.apache.spark" % "spark-core" % "1.1.0"
Scala -compiler version = 2.10.4
Java version = 1.7.0_67
I tried to set the local spark spark IC as such:
System.setProperty("SPARK_LOCAL_IP", "127.0.0.1") println(System.getenv("SPARK_LOCAL_IP"))
and like this:
scala.util.Properties.envOrElse("SPARK_LOCAL_IP", "127.0.0.1") println(System.getenv("SPARK_LOCAL_IP"))
But it just prints zero.
Thanks in advance for any suggestions.
Edit: Changing the spark configuration as follows solves the problem:
val conf = new SparkConf(true) .set("spark.cassandra.connection.host", "127.0.0.1") .set("spark.executor.extraClassPath", "/home/bas/Downloads/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar")
Please note that this did not work with the spark-cassandra connector SBT jar file, I had to create it from the source. In fact, I removed all the SBT dependencies and added a spark dependency as the build source.