Spark JSON Text Box for RDD

I have a cassandra table with a text type field called snapshot containing JSON objects:

[identifier, timestamp, snapshot] 

I realized that in order to make transformations in this field using Spark, I need to convert this field of this RDD to another RDD in order to make transformations in the JSON schema.

It is right? How do I get to this?

Edit: At the moment, I have managed to create RDD from one text field:

 val conf = new SparkConf().setAppName("signal-aggregation") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") val first = snapshots.first() val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3))) firstJson.printSchema() 

Which shows me the JSON schema. Good!

How to start telling Spark that this scheme should be applied to all rows of table snapshots in order to get RDD in this snapshot from each row?

+8
scala cassandra apache-spark rdd
source share
1 answer

Almost there, you just want to pass your RDD [String] to your json in the jsonRDD method

 val conf = new SparkConf().setAppName("signal-aggregation") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly jsonSchemaRDD.registerTempTable("testjson") sqlContext.sql("SELECT * FROM testjson where .... ").collect 

Quick example

 val stringRDD = sc.parallelize(Seq(""" { "isActive": false, "balance": "$1,431.73", "picture": "http://placehold.it/32x32", "age": 35, "eyeColor": "blue" }""", """{ "isActive": true, "balance": "$2,515.60", "picture": "http://placehold.it/32x32", "age": 34, "eyeColor": "blue" }""", """{ "isActive": false, "balance": "$3,765.29", "picture": "http://placehold.it/32x32", "age": 26, "eyeColor": "blue" }""") ) sqlContext.jsonRDD(stringRDD).registerTempTable("testjson") csc.sql("SELECT age from testjson").collect //res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26]) 
+13
source share

All Articles