I am trying to connect a Spark cluster running in a virtual machine with IP 10.20.30.50 and port 7077 from a Java application and run an example of word counting:
SparkConf conf = new SparkConf().setMaster("spark://10.20.30.50:7077").setAppName("wordCount"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> textFile = sc.textFile("hdfs://localhost:8020/README.md"); String result = Long.toString(textFile.count()); JavaRDD<String> words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator()); JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b); counts.saveAsTextFile("hdfs://localhost:8020/tmp/output"); sc.stop(); return result;
In a Java application, the following stack trace is shown:
Running Spark version 2.0.1 Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Changing view acls to: lii5ka Changing modify acls to: lii5ka Changing view acls groups to: Changing modify acls groups to: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(lii5ka); groups with view permissions: Set(); users with modify permissions: Set(lii5ka); groups with modify permissions: Set() Successfully started service 'sparkDriver' on port 61267. Registering MapOutputTracker Registering BlockManagerMaster Created local directory at /private/var/folders/4k/h0sl02993_99bzt0dzv759000000gn/T/blockmgr-51de868d-3ba7-40be-8c53-f881f97ced63 MemoryStore started with capacity 2004.6 MB Registering OutputCommitCoordinator Logging initialized @48403ms jetty-9.2.z-SNAPSHOT Started o.s.j.s.ServletContextHandler@1316e7ec {/jobs,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@782de006 {/jobs/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2d0353 {/jobs/job,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@381e24a0 {/jobs/job/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@1c138dc8 {/stages,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@b29739c {/stages/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@63f6de31 {/stages/stage,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2a04ddcb {/stages/stage/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2af9688e {/stages/pool,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@6a0c5bde {/stages/pool/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@3f5e17f8 {/storage,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@33b86f5d {/storage/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5264dcbc {/storage/rdd,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5a3ebf85 {/storage/rdd/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@159082ed {/environment,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@6522c585 {/environment/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@115774a1 {/executors,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@3e3a3399 {/executors/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2f2c5959 {/executors/threadDump,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5c51afd4 {/executors/threadDump/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@76893a83 {/static,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@19c07930 {/,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@54eb0dc0 {/api,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5953786 {/stages/stage/kill,null,AVAILABLE} Started ServerConnector@2eeb8bd6 {HTTP/1.1}{0.0.0.0:4040} Started @48698ms Successfully started service 'SparkUI' on port 4040. Bound SparkUI to 0.0.0.0, and started at http://192.168.0.104:4040 Connecting to master spark://10.20.30.50:7077... Successfully created connection to /10.20.30.50:7077 after 25 ms (0 ms spent in bootstraps) Connecting to master spark://10.20.30.50:7077... Still have 2 requests outstanding when connection from /10.20.30.50:7077 is closed Failed to connect to master 10.20.30.50:7077 org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) ~[spark-core_2.11-2.0.1.jar:2.0.1] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.8.jar:na] at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) ~[scala-library-2.11.8.jar:na] at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) ~[spark-core_2.11-2.0.1.jar:2.0.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102] Caused by: java.io.IOException: Connection from /10.20.30.50:7077 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] ... 1 common frames omitted
The following error message appears in the Spark Master 10.20.30.50 at 10.20.30.50 :
16/11/05 14:47:20 ERROR OneForOneStrategy: Error while decoding incoming Akka PDU of length: 1298 akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 1298 Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed. at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167) at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:580) at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:375) at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:343) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:604) at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:269) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:269) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89) at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108) at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6643) at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6607) at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6703) at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6698) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) at akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821) at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168) ... 19 more
Additional Information
- The example works fine when I use
new SparkConf().setMaster("local") instead - I can connect to Spark Master using
spark-shell --master spark://10.20.30.50:7077 on the same machine