I want to connect Kafka + Cassandra to Spark 1.5.1.
Library Versions:
scalaVersion := "2.10.6" libraryDependencies ++= Seq( "org.apache.spark" % "spark-streaming_2.10" % "1.5.1", "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.1", "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.5.0-M2" )
Initialization and use in the application:
val sparkConf = new SparkConf(true) .setMaster("local[2]") .setAppName("KafkaStreamToCassandraApp") .set("spark.executor.memory", "1g") .set("spark.cores.max", "1") .set("spark.cassandra.connection.host", "127.0.0.1")
Creates a schema in Cassandra as follows:
CassandraConnector(sparkConf).withSessionDo { session => session.execute(s"DROP KEYSPACE IF EXISTS kafka_streaming") session.execute(s"CREATE KEYSPACE IF NOT EXISTS kafka_streaming WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS kafka_streaming.wordcount (word TEXT PRIMARY KEY, count COUNTER)") session.execute(s"TRUNCATE kafka_streaming.wordcount") }
Also, when preparing jar create several strategies:
assemblyMergeStrategy in assembly := { case PathList("com", "esotericsoftware", xs@ _*) => MergeStrategy.last case PathList("com", "google", xs@ _*) => MergeStrategy.first case PathList("org", "apache", xs@ _*) => MergeStrategy.last case PathList("io", "netty", xs@ _*) => MergeStrategy.last case PathList("com", "codahale", xs@ _*) => MergeStrategy.last case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first
I think the problem is related to
case PathList("com", "google", xs@ _*) => MergeStrategy.first
Associated with the use of MergeStrategy.last .
Any ideas?
Exception thrown:
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.reflect.TypeToken.isPrimitive()Z at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:142) at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:136) at com.datastax.driver.core.TypeCodec$BlobCodec.<init>(TypeCodec.java:609) at com.datastax.driver.core.TypeCodec$BlobCodec.<clinit>(TypeCodec.java:606) at com.datastax.driver.core.CodecRegistry.<clinit>(CodecRegistry.java:147) at com.datastax.driver.core.Configuration$Builder.build(Configuration.java:259) at com.datastax.driver.core.Cluster$Builder.getConfiguration(Cluster.java:1135) at com.datastax.driver.core.Cluster.<init>(Cluster.java:111) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:178) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1152) at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:85) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155)