I am trying to use SQL in a spark data frame. But in data frame 1, the value has a string (which is similar to a JSON structure):
I saved my data frame in temp: TestTable table
When I did desc:
col_name data_type requestId string name string features string
But function values ββare json:
{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}
I just want to query TestTable, where totalSpent> 10. Can someone tell me how to do this?
My JSON file looks like this:
{ "requestId": 232323, "name": "ravi", "features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}" }
functions is a string. I need only totalSpent. I tried:
val features = StructType( Array(StructField("totalSpent",LongType,true), StructField("movies",LongType,true) )) val schema = StructType(Array( StructField("requestId",StringType,true), StructField("name",StringType,true), StructField("features",features,true), ) ) val records = sqlContext.read.schema(schema).json(filePath)
Since each request has one line of JSON functions. But that gives me an error.
When I tried using
val records = sqlContext.jsonFile(filePath) records.printSchema
shows me:
root |-- requestId: string (nullable = true) |-- features: string (nullable = true) |-- name: string (nullable = true)
Can parallelism be used inside StructField when creating a schema? I tried:
I first tried with : val customer = StructField("features",StringType,true) val events = sc.parallelize(customer :: Nil) val schema = StructType(Array( StructField("requestId",StringType,true), StructField("name", StructType(events, true),true), StructField("features",features,true), ) )
It also gives me an error. Also tried:
import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = records.withColumn("parsedJSON", parseJson($"features")) parsed.show This gives me : <console>:78: error: object liftweb is not a member of package net import net.liftweb.json.parse
I tried:
I tried:
val parseJson = udf((s: String) => { sqlContext.read.json(s) }) val parsed = records.withColumn("parsedJSON", parseJson($"features")) parsed.show
But again, a mistake.
I tried:
import org.json4s._ import org.json4s.jackson.JsonMethods._ val parseJson = udf((s: String) => { parse(s) }) val parsed = records.withColumn("parsedJSON", parseJson($"features")) parsed.show
But it gives me:
java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
This gives me the correct schema (based on the answer given by null 323:
val extractFeatures = udf((features: String) => Try { implicit val formats = DefaultFormats parse(features).extract[Features] }.toOption) val parsed = records.withColumn("features", extractFeatures($"features"))
parsed.printSchema
But when I request:
val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent")
value.show gives null .