The approach in the above code is a classic centralized algorithm that will only work if it runs on a single node. Both Cassandra and Spark are distributed systems and therefore it is necessary to model the process in such a way that it can be distributed between several nodes.
There are several approaches: If you know the row keys to extract, you can do something simple: (using the Java DataStax Driver)
val data = sparkContext.parallelize(keys).map{key => val cluster = val cluster = Cluster.builder.addContactPoint(host).build() val session = cluster.connect(keyspace) val statement = session.prepare("...cql...);") val boundStatement = new BoundStatement(sttmt) session.execute(session.execute(boundStatement.bind(...data...) }
This will allow you to effectively distribute the key selection through Spark Cluster. Note how the connection to C * is performed in close, as this ensures that the connection is established when the task is executed for each individual distributed worker.
Given that your example uses a wildcard (i.e. keys are not known), using the Hadoop Cassandra interface is a good option. The Spark-Cassandra example linked in the question illustrates the use of this Hadoop interface on Kassandra.
Calliope is a library that encapsulates the complexity of using the Hadoop interface, providing a simple API to access this functionality. It is available only in Scala, because it uses special Scala functions (for example, implicits and macros in the upcoming version) With Calliope you basically declare how to convert RDD [type] to string and string value, and Calliope will take care of setting hadoop interfaces for tasks. We found that Calliope (and the underlying hadoop interfaces) are 2-4 times faster than using a driver to interact with Cassandra.
Conclusion: I walked away from Spring -Data configuration to gain access to Cassandra, as this will limit you to a single node. Consider simple concurrent access, if possible, or learn about using Calliope in Scala.
maasg
source share