Spark with Cassandra I / O

Present the following scenario: a Spark application (Java implementation) uses the Cassandra database to load, convert to RDD, and process data. The application also processes new data from the database, which is also processed by the user recipient. The output of the streaming process is stored in a database. The implementation uses Spring Data Cassandra from database integration.

CassandraConfig:

@Configuration @ComponentScan(basePackages = {"org.foo"}) @PropertySource(value = { "classpath:cassandra.properties" }) public class CassandraConfig { @Autowired private Environment env; @Bean public CassandraClusterFactoryBean cluster() { CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean(); cluster.setContactPoints(env.getProperty("cassandra.contactpoints")); cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port"))); return cluster; } @Bean public CassandraMappingContext mappingContext() { return new BasicCassandraMappingContext(); } @Bean public CassandraConverter converter() { return new MappingCassandraConverter(mappingContext()); } @Bean public CassandraSessionFactoryBean session() throws Exception { CassandraSessionFactoryBean session = new CassandraSessionFactoryBean(); session.setCluster(cluster().getObject()); session.setKeyspaceName(env.getProperty("cassandra.keyspace")); session.setConverter(converter()); session.setSchemaAction(SchemaAction.NONE); return session; } @Bean public CassandraOperations cassandraTemplate() throws Exception { return new CassandraTemplate(session().getObject()); } } 

Method DataProcessor.main:

 // Initialize spring application context ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class); ApplicationContextHolder.setApplicationContext(applicationContext); CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class); // Initialize spark context SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); // Load data pages List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class); // Parallelize the first page JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults); while(pagingResults != null && !pagingResults.isEmpty()) { Event lastEvent = pagingResults.get(pagingResults.size() - 1); pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class); // Parallelize page and add to the existing rddBuffer = rddBuffer.union(sc.parallelize(pagingResults)); } // data processing ... 

A large amount of data is expected for bootstrapping. For this reason, the data is paginated, loaded, and distributed in rddBuffer.

The following options are also available:

I would like to know what is best for integrating Spark with Cassandra. What would be the best option in my implementation?

Apache Spark 1.0.0, Apache Cassandra 2.0.8

+7
java spring-data-cassandra cassandra apache-spark
source share
2 answers

The easiest way to work with Cassandra and Spark is to use the official open source Cassandra driver for Spark developed by DataStax: https://github.com/datastax/spark-cassandra-connector

This driver is built on top of the Cassandra Java driver and provides a direct bridge between Cassandra and Spark. Unlike Calliope, it does not use the Hadoop interface. In addition, it offers the following unique features:

  • support for all Cassandra data types, including collections, out of the box
  • easy matching of Cassandra strings with custom classes or tuples without the need for any implications or other advanced functions in Scala
  • saving any RDD in Cassandra
  • full support for Cassandra virtual hosts
  • the ability to filter / select on the server side, for example. Using Cassandra Cluster Clusters or Secondary Indexes
+9
source share

The approach in the above code is a classic centralized algorithm that will only work if it runs on a single node. Both Cassandra and Spark are distributed systems and therefore it is necessary to model the process in such a way that it can be distributed between several nodes.

There are several approaches: If you know the row keys to extract, you can do something simple: (using the Java DataStax Driver)

 val data = sparkContext.parallelize(keys).map{key => val cluster = val cluster = Cluster.builder.addContactPoint(host).build() val session = cluster.connect(keyspace) val statement = session.prepare("...cql...);") val boundStatement = new BoundStatement(sttmt) session.execute(session.execute(boundStatement.bind(...data...) } 

This will allow you to effectively distribute the key selection through Spark Cluster. Note how the connection to C * is performed in close, as this ensures that the connection is established when the task is executed for each individual distributed worker.

Given that your example uses a wildcard (i.e. keys are not known), using the Hadoop Cassandra interface is a good option. The Spark-Cassandra example linked in the question illustrates the use of this Hadoop interface on Kassandra.

Calliope is a library that encapsulates the complexity of using the Hadoop interface, providing a simple API to access this functionality. It is available only in Scala, because it uses special Scala functions (for example, implicits and macros in the upcoming version) With Calliope you basically declare how to convert RDD [type] to string and string value, and Calliope will take care of setting hadoop interfaces for tasks. We found that Calliope (and the underlying hadoop interfaces) are 2-4 times faster than using a driver to interact with Cassandra.

Conclusion: I walked away from Spring -Data configuration to gain access to Cassandra, as this will limit you to a single node. Consider simple concurrent access, if possible, or learn about using Calliope in Scala.

0
source share

All Articles