How to query in a data frame where 1 StringType field has json value in Spark SQL

I am trying to use SQL in a spark data frame. But in data frame 1, the value has a string (which is similar to a JSON structure):

I saved my data frame in temp: TestTable table

When I did desc:

col_name data_type requestId string name string features string 

But function values ​​are json:

 {"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}} 

I just want to query TestTable, where totalSpent> 10. Can someone tell me how to do this?

My JSON file looks like this:

  { "requestId": 232323, "name": "ravi", "features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}" } 

functions is a string. I need only totalSpent. I tried:

 val features = StructType( Array(StructField("totalSpent",LongType,true), StructField("movies",LongType,true) )) val schema = StructType(Array( StructField("requestId",StringType,true), StructField("name",StringType,true), StructField("features",features,true), ) ) val records = sqlContext.read.schema(schema).json(filePath) 

Since each request has one line of JSON functions. But that gives me an error.

When I tried using

 val records = sqlContext.jsonFile(filePath) records.printSchema 

shows me:

 root |-- requestId: string (nullable = true) |-- features: string (nullable = true) |-- name: string (nullable = true) 

Can parallelism be used inside StructField when creating a schema? I tried:

 I first tried with : val customer = StructField("features",StringType,true) val events = sc.parallelize(customer :: Nil) val schema = StructType(Array( StructField("requestId",StringType,true), StructField("name", StructType(events, true),true), StructField("features",features,true), ) ) 

It also gives me an error. Also tried:

 import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = records.withColumn("parsedJSON", parseJson($"features")) parsed.show This gives me : <console>:78: error: object liftweb is not a member of package net import net.liftweb.json.parse 

I tried:

I tried:

  val parseJson = udf((s: String) => { sqlContext.read.json(s) }) val parsed = records.withColumn("parsedJSON", parseJson($"features")) parsed.show 

But again, a mistake.

I tried:

 import org.json4s._ import org.json4s.jackson.JsonMethods._ val parseJson = udf((s: String) => { parse(s) }) val parsed = records.withColumn("parsedJSON", parseJson($"features")) parsed.show 

But it gives me:

 java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) 

This gives me the correct schema (based on the answer given by null 323:

 val extractFeatures = udf((features: String) => Try { implicit val formats = DefaultFormats parse(features).extract[Features] }.toOption) val parsed = records.withColumn("features", extractFeatures($"features")) 

parsed.printSchema

But when I request:

 val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent") 

value.show gives null .

+5
source share
1 answer

When you return data from UDF, it should be represented as SQL types, but AST JSON should not. One approach is to create a case class similar to this:

 case class Features( places: Integer, movies: Integer, totalPlacesVisited: Integer, totalSpent: Integer, SpentMap: Map[String, Integer], benefits: Map[String, Integer] ) 

and use it for extract objects:

 val df = Seq(( 232323, "ravi", """{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}""" )).toDF("requestId", "name", "features") val extractFeatures = udf((features: String) => parse(features).extract[Features]) val parsed = df.withColumn("features", extractFeatures($"features")) parsed.show(false) // +---------+----+-----------------------------------------------------------------+ // |requestId|name|features | // +---------+----+-----------------------------------------------------------------+ // |232323 |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]| // +---------+----+-----------------------------------------------------------------+ parsed.printSchema // root // |-- requestId: integer (nullable = false) // |-- name: string (nullable = true) // |-- features: struct (nullable = true) // | |-- places: integer (nullable = true) // | |-- movies: integer (nullable = true) // | |-- totalPlacesVisited: integer (nullable = true) // | |-- totalSpent: integer (nullable = true) // | |-- SpentMap: map (nullable = true) // | | |-- key: string // | | |-- value: integer (valueContainsNull = true) // | |-- benefits: map (nullable = true) // | | |-- key: string // | | |-- value: integer (valueContainsNull = true) 

Depending on the other entries and the expected usage, you should customize the view and add the appropriate error handling logic.

You can also use DSL to access individual fields as strings:

 val getMovieSpent = udf((s: String) => compact(render(parse(s) \\ "SpentMap" \\ "Movie"))) df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show // +---------+----+--------------------+-----------+ // |requestId|name| features|movie_spent| // +---------+----+--------------------+-----------+ // | 232323|ravi|{"places":11,"mov...| 2| // +---------+----+--------------------+-----------+ 

For alternative approaches, see How to query JSON data columns using Spark DataFrames?

+2
source

All Articles