Guava version when using a spark shell

I am trying to use the spark-cassandra connector through the spark shell on dataproc, however I cannot connect to my cluster. There seems to be a version mismatch, since the class path includes a much older version of guava from another place, even when I specify the correct version at startup. I suspect this is probably caused by all the Hadoop dependencies introduced by default in the classpath.

Is there anyway that the spark shell should use only the correct version of guava without getting rid of all the included jars related to the Hadoop dataproc?

Relevant data:

Running a spark shell showing its correct version of Guava: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

 :: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central found org.apache.cassandra#cassandra-clientutil;2.2.2 in central found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central found io.netty#netty-handler;4.0.27.Final in central found io.netty#netty-buffer;4.0.27.Final in central found io.netty#netty-common;4.0.27.Final in central found io.netty#netty-transport;4.0.27.Final in central found io.netty#netty-codec;4.0.27.Final in central found com.codahale.metrics#metrics-core;3.0.2 in central found org.slf4j#slf4j-api;1.7.5 in central found org.apache.commons#commons-lang3;3.3.2 in central found com.google.guava#guava;16.0.1 in central found org.joda#joda-convert;1.2 in central found joda-time#joda-time;2.3 in central found com.twitter#jsr166e;1.1.0 in central found org.scala-lang#scala-reflect;2.10.5 in central :: resolution report :: resolve 502ms :: artifacts dl 10ms :: modules in use: com.codahale.metrics#metrics-core;3.0.2 from central in [default] com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 from central in [default] com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 from central in [default] com.google.guava#guava;16.0.1 from central in [default] com.twitter#jsr166e;1.1.0 from central in [default] io.netty#netty-buffer;4.0.27.Final from central in [default] io.netty#netty-codec;4.0.27.Final from central in [default] io.netty#netty-common;4.0.27.Final from central in [default] io.netty#netty-handler;4.0.27.Final from central in [default] io.netty#netty-transport;4.0.27.Final from central in [default] joda-time#joda-time;2.3 from central in [default] org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default] org.apache.commons#commons-lang3;3.3.2 from central in [default] org.joda#joda-convert;1.2 from central in [default] org.scala-lang#scala-reflect;2.10.5 from central in [default] org.slf4j#slf4j-api;1.7.5 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 16 | 0 | 0 | 0 || 16 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 16 already retrieved (0kB/12ms) Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.5.2 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_66-internal) Type in expressions to have them evaluated. Type :help for more information. 15/12/10 17:38:46 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Spark context available as sc. ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used 15/12/10 17:38:54 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/12/10 17:38:54 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. SQL context available as sqlContext. 

Stack trace during initial connection:

 java.io.IOException: Failed to open native connection to Cassandra at {10.240.0.7}:9042 at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921) at org.apache.spark.rdd.RDD.count(RDD.scala:1125) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51) at $iwC$$iwC$$iwC.<init>(<console>:53) at $iwC$$iwC.<init>(<console>:55) at $iwC.<init>(<console>:57) at <init>(<console>:59) at .<init>(<console>:63) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/Listenab leFuture; at com.datastax.driver.core.Connection.initAsync(Connection.java:178) at com.datastax.driver.core.Connection$Factory.open(Connection.java:742) at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:240) at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:187) at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1393) at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:402) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155) ... 70 more 
+10
apache-spark google-cloud-dataproc spark-cassandra-connector
source share
3 answers

Unfortunately, the Hadoop dependency on Guava 11 (which does not mention the Futures.withFallback method) is a long-standing problem, and indeed Hadoop 2.7.1 is still dependent on Guava 11 .

The Spark core uses Guava 14, as can be seen here, but this can be circumvented by shading Guava inside the Spark assembly:

 $ jar tf /usr/lib/spark/lib/spark-assembly.jar | grep concurrent.Futures org/spark-project/guava/util/concurrent/Futures$1.class org/spark-project/guava/util/concurrent/Futures$2.class org/spark-project/guava/util/concurrent/Futures$3.class org/spark-project/guava/util/concurrent/Futures$4.class org/spark-project/guava/util/concurrent/Futures$5.class org/spark-project/guava/util/concurrent/Futures$6.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture$1.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$1.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$2.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture.class org/spark-project/guava/util/concurrent/Futures$FutureCombiner.class org/spark-project/guava/util/concurrent/Futures$ImmediateCancelledFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulFuture.class org/spark-project/guava/util/concurrent/Futures$MappingCheckedFuture.class org/spark-project/guava/util/concurrent/Futures.class $ javap -cp /usr/lib/spark/lib/spark-assembly.jar org.spark-project.guava.util.concurrent.Futures Compiled from "Futures.java" public final class org.spark-project.guava.util.concurrent.Futures { public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> makeChecked(org.spark-project.guava.util.concurrent.ListenableFuture<V>, com.google.common.base.Function<java.lang.Exception, X>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFuture(V); public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateCheckedFuture(V); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFailedFuture(java.lang.Throwable); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateCancelledFuture(); public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateFailedCheckedFuture(X); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>, java.util.concurrent.Executor); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>, java.util.concurrent.Executor); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>, java.util.concurrent.Executor); public static <I, O> java.util.concurrent.Future<O> lazyTransform(java.util.concurrent.Future<I>, com.google.common.base.Function<? super I, ? extends O>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> dereference(org.spark-project.guava.util.concurrent.ListenableFuture<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>); public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>, java.util.concurrent.Executor); public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, java.lang.Class<X>) throws X; public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, long, java.util.concurrent.TimeUnit, java.lang.Class<X>) throws X; public static <V> V getUnchecked(java.util.concurrent.Future<V>); static {}; } 

You can follow the instructions here https://arjon.es/2015/making-hadoop-2.6-spark-cassandra-driver-play-nice-together/ to also do the shading yourself at compile time. With spark-shell, you can avoid some changes to spark.driver.extraClassPath as mentioned here , although it may continue to collide at various points.

+7
source share

I had the same problem running Spark in Eclipse after trying to download Google GWT. Download the jar file and point it to the build path in Eclipse or ClassPath elsewhere → http://www.java2s.com/Code/Jar/g/Downloadguava150rc1jar.htm

0
source share

If you use gradle to create a thick jar, you can use relocate in your shadowJar :

 shadowJar{ zip64 true mergeServiceFiles() relocate 'com.google', 'hidden.google' } 
0
source share

All Articles