Reading from Kassandra using Spark Streaming

I have a problem when I use sparking to read from Cassandra.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

As the link above, I use

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3) 

to select data from cassandra, but it seems that the spark stream has only one request once, but I want it to continue to query using an interval of 10 senconds.

My code is as follows, please reply.

Thanks!

 import org.apache.spark._ import org.apache.spark.streaming._ import com.datastax.spark.connector.streaming._ import org.apache.spark.rdd._ import scala.collection.mutable.Queue object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1") val ssc = new StreamingContext(conf, Seconds(10)) val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") //rdd.collect().foreach(println) val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]() val dstream = ssc.queueStream(rddQueue) dstream.print() ssc.start() rdd.collect().foreach(println) rddQueue += rdd ssc.awaitTermination() } 

}

+7
scala spark-streaming spark-cassandra-connector
source share
2 answers

You can create a ConstantInputDStream with a CassandraRDD input. ConstantInputDStream will provide the same RDD for each streaming interval, and by performing an action on this RDD, you initiate the materialization of the RDD line, which will cause the Cassandra request to be executed each time.

Make sure that the requested data does not grow without restrictions in order to avoid increasing the request time and as a result an unstable streaming process occurs.

Something like this should do the trick (using your code as a starting point):

 import org.apache.spark.streaming.dstream.ConstantInputDStream val ssc = new StreamingContext(conf, Seconds(10)) val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") val dstream = new ConstantInputDStream(ssc, cassandraRDD) dstream.foreachRDD{ rdd => // any action will trigger the underlying cassandra query, using collect to have a simple output println(rdd.collect.mkString("\n")) } ssc.start() ssc.awaitTermination() 
+6
source share

I had the same problem and found a solution by subclassing the InputDStream class. You must define the start() and compute() methods.

start() can be used for cooking. The main logic is in compute() . It should return Option[RDD[T]] . To make the class flexible, the flag InputStreamQuery defined.

 trait InputStreamQuery[T] { // where clause condition for partition key def partitionCond : (String, Any) // function to return next partition key def nextValue(v:Any) : Option[Any] // where clause condition for clustering key def whereCond : (String, (T) => Any) // batch size def batchSize : Int } 

For the Cassandra keyspace.test table, create test_by_date , which reorganizes the table with the date key.

 CREATE TABLE IF NOT exists keyspace.test (id timeuuid, date text, value text, primary key (id)) CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS SELECT * FROM keyspace.test WHERE id IS NOT NULL PRIMARY KEY (date, id) WITH CLUSTERING ORDER BY ( id ASC ); 

One possible implementation for table test should be

 class class Test(id:UUID, date:String, value:String) trait InputStreamQueryTest extends InputStreamQuery[Test] { val dateFormat = "uuuu-MM-dd" // set batch size as 10 records override def batchSize: Int = 10 // partitioning key conditions, query string and initial value override def partitionCond: (String, Any) = ("date = ?", "2017-10-01") // clustering key condition, query string and function to get clustering key from the instance override def whereCond: (String, Test => Any) = (" id > ?", m => m.id) // return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01' override def nextValue(v: Any): Option[Any] = { import java.time.format.DateTimeFormatter val formatter = DateTimeFormatter.ofPattern( dateFormat) val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1) if ( nextDate.isAfter( LocalDate.now()) ) None else Some( nextDate.format(formatter)) } } 

It can be used in the CassandraInputStream class as follows.

 class CassandraInputStream[T: ClassTag] (_ssc: StreamingContext, keyspace:String, table:String) (implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T]) extends InputDStream[T](_ssc) with InputStreamQuery[T] { var lastElm:Option[T] = None var partitionKey : Any = _ override def start(): Unit = { // find a partition key which stores some records def findStartValue(cql : String, value:Any): Any = { val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1) if (rdd.cassandraCount() > 0 ) value else { nextValue(value).map( findStartValue( cql, _)).getOrElse( value) } } // get query string and initial value from partitionCond method val (cql, value) = partitionCond partitionKey = findStartValue(cql, value) } override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { val (cql, _) = partitionCond val (wh, whKey) = whereCond def fetchNext( patKey: Any) : Option[CassandraTableScanRDD[T]] = { // query with partitioning condition val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where( cql, patKey) val rdd = lastElm.map{ x => query.where( wh, whKey(x)).withAscOrder.limit(batchSize) }.getOrElse( query.withAscOrder.limit(batchSize)) if ( rdd.cassandraCount() > 0 ) { // store the last element of this RDD lastElm = Some(rdd.collect.last) Some(rdd) } else { // find the next partition key which stores data nextValue(patKey).flatMap{ k => partitionKey = k fetchNext(k)} } } fetchNext( partitionKey) } } 

Combining all the classes,

 val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(10)) val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest dstream.map(println).saveToCassandra( ... ) ssc.start() ssc.awaitTermination() 
0
source share

All Articles