Sparks and Java: An Exception Pending Result

I am trying to connect a Spark cluster running in a virtual machine with IP 10.20.30.50 and port 7077 from a Java application and run an example of word counting:

 SparkConf conf = new SparkConf().setMaster("spark://10.20.30.50:7077").setAppName("wordCount"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> textFile = sc.textFile("hdfs://localhost:8020/README.md"); String result = Long.toString(textFile.count()); JavaRDD<String> words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator()); JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b); counts.saveAsTextFile("hdfs://localhost:8020/tmp/output"); sc.stop(); return result; 

In a Java application, the following stack trace is shown:

 Running Spark version 2.0.1 Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Changing view acls to: lii5ka Changing modify acls to: lii5ka Changing view acls groups to: Changing modify acls groups to: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(lii5ka); groups with view permissions: Set(); users with modify permissions: Set(lii5ka); groups with modify permissions: Set() Successfully started service 'sparkDriver' on port 61267. Registering MapOutputTracker Registering BlockManagerMaster Created local directory at /private/var/folders/4k/h0sl02993_99bzt0dzv759000000gn/T/blockmgr-51de868d-3ba7-40be-8c53-f881f97ced63 MemoryStore started with capacity 2004.6 MB Registering OutputCommitCoordinator Logging initialized @48403ms jetty-9.2.z-SNAPSHOT Started o.s.j.s.ServletContextHandler@1316e7ec {/jobs,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@782de006 {/jobs/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2d0353 {/jobs/job,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@381e24a0 {/jobs/job/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@1c138dc8 {/stages,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@b29739c {/stages/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@63f6de31 {/stages/stage,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2a04ddcb {/stages/stage/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2af9688e {/stages/pool,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@6a0c5bde {/stages/pool/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@3f5e17f8 {/storage,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@33b86f5d {/storage/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5264dcbc {/storage/rdd,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5a3ebf85 {/storage/rdd/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@159082ed {/environment,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@6522c585 {/environment/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@115774a1 {/executors,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@3e3a3399 {/executors/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@2f2c5959 {/executors/threadDump,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5c51afd4 {/executors/threadDump/json,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@76893a83 {/static,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@19c07930 {/,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@54eb0dc0 {/api,null,AVAILABLE} Started o.s.j.s.ServletContextHandler@5953786 {/stages/stage/kill,null,AVAILABLE} Started ServerConnector@2eeb8bd6 {HTTP/1.1}{0.0.0.0:4040} Started @48698ms Successfully started service 'SparkUI' on port 4040. Bound SparkUI to 0.0.0.0, and started at http://192.168.0.104:4040 Connecting to master spark://10.20.30.50:7077... Successfully created connection to /10.20.30.50:7077 after 25 ms (0 ms spent in bootstraps) Connecting to master spark://10.20.30.50:7077... Still have 2 requests outstanding when connection from /10.20.30.50:7077 is closed Failed to connect to master 10.20.30.50:7077 org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) ~[spark-core_2.11-2.0.1.jar:2.0.1] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.8.jar:na] at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) ~[scala-library-2.11.8.jar:na] at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) ~[spark-core_2.11-2.0.1.jar:2.0.1] at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) ~[spark-core_2.11-2.0.1.jar:2.0.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102] Caused by: java.io.IOException: Connection from /10.20.30.50:7077 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] ... 1 common frames omitted 

The following error message appears in the Spark Master 10.20.30.50 at 10.20.30.50 :

 16/11/05 14:47:20 ERROR OneForOneStrategy: Error while decoding incoming Akka PDU of length: 1298 akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 1298 Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed. at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167) at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:580) at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:375) at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:343) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:604) at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:269) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:269) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89) at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108) at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6643) at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6607) at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6703) at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6698) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) at akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821) at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168) ... 19 more 

Additional Information

  • The example works fine when I use new SparkConf().setMaster("local") instead
  • I can connect to Spark Master using spark-shell --master spark://10.20.30.50:7077 on the same machine
+7
java scala hdfs protocol-buffers apache-spark
source share
2 answers

It seems that the network error is primarily (but actually NOT) in disguising the spark version mismatch. You can specify the correct version of spark cans mainly on prefabricated cans.

This issue may occur due to version matching in a Hadoop RPC call using Protobuffer.

when the analyzed protocol message is incorrect, for example. it contains a malformed variable or negative byte length.

  • My experience with protobuf, InvalidProtocolBufferException can occur only when the message could not be parsed (programmatically, if you parse the protobuf message, maybe the legth message is zero or the message is corrupted ...).

  • Spark uses Akku Actors to send messages between the master / driver and workers and Internally akka uses googles protobuf to communicate. see method below from AkkaPduCodec.scala)

     override def decodePdu(raw: ByteString): AkkaPdu = { try { val pdu = AkkaProtocolMessage.parseFrom(raw.toArray) if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction) else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) } catch { case e: InvalidProtocolBufferException Ò‑' throw new PduCodecException("Decoding PDU failed.", e) } } 

But in your case, since its version mismatch, the new protobuf version message cannot be parsed from the old parser version ... or something like ...

If you use other maven dependencies, pls. overview.

+8
source share

It turned out that I had a version of Spark 1.5.2 running on a virtual machine and used version 2.0.1 of the Spark library in Java. I fixed the problem using the appropriate version of the Spark library in my pom.xml , which

 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.2</version> </dependency> 

Another problem (arising later) was that I also had to bind the version of Scala with which the library was created. This is the suffix _2.10 in the artifactId file.

Basically, @RamPrassad's answer pointed me in the right direction, but did not give clear advice on what I need to do to fix my problem.

By the way, I could not update Spark on the virtual machine, since it was delivered to me by the HortonWorks distribution ...
+2
source share

All Articles