Extracting `Seq [(String, String, String)]` from a DataFrame spark

I have a DF spark with strings Seq[(String, String, String)] . I'm trying to do something like flatMap with this, but everything I do ends up with a throw

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be passed to scala.Tuple3

I can take one line or several lines from DF just fine

 df.map{ r => r.getSeq[Feature](1)}.first 

returns

 Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] ..... 

and the RDD data type seems correct.

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

Df circuit

 root |-- article_id: long (nullable = true) |-- content_processed: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- lemma: string (nullable = true) | | |-- pos_tag: string (nullable = true) | | |-- ne_tag: string (nullable = true) 

I know this problem is due to spark sql treating RDD strings like org.apache.spark.sql.Row , although they idioticly say it's a Seq[(String, String, String)] . There is a related question (link below), but the answer to this question does not work for me. I am also not familiar with sparks to figure out how to turn it into a working solution.

Are the strings Row[Seq[(String, String, String)]] or Row[(String, String, String)] or Seq[Row[(String, String, String)]] or something even crazier.

I'm trying to do something like

 df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1) 

which seems to work but doesn't actually work

 df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first 

throws the above error. So, how should I (for example) get the first element of the second tuple in each row?

In addition, WHY has a spark intended for this, it seems that the idiotic claims that something has one type, when in fact it is not and cannot be converted to the declared type.


Related question: GenericRowWithSchema exception when passing ArrayBuffer to HashSet in DataFrame to RDD from Hive table

Related error report: http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

+10
scala dataframe apache-spark apache-spark-sql
source share
2 answers

Well, he doesn't claim to be a motorcade. He claims that it is a struct that matches Row :

 import org.apache.spark.sql.Row case class Feature(lemma: String, pos_tag: String, ne_tag: String) case class Record(id: Long, content_processed: Seq[Feature]) val df = Seq( Record(1L, Seq( Feature("ancient", "jj", "o"), Feature("olympia_greece", "nn", "location") )) ).toDF val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0)) 

You will find the exact mapping rules in the Spark SQL Programming Guide .

Since Row not a pretty structure, you probably want to map it to something useful:

 content.map(_.map { case Row(lemma: String, pos_tag: String, ne_tag: String) => (lemma, pos_tag, ne_tag) }) 

or

 content.map(_.map ( row => ( row.getAs[String]("lemma"), row.getAs[String]("pos_tag"), row.getAs[String]("ne_tag") ))) 

Finally, a slightly more concise approach with Datasets :

 df.as[Record].rdd.map(_.content_processed) 

or

 df.select($"content_processed").as[Seq[(String, String, String)]] 

although at the moment it seems a little difficult.

There is an important difference between the first approach ( Row.getAs ) and the second ( Dataset.as ). The first retrieves objects as Any and applies asInstanceOf . The latter uses encoders to convert between internal types and the desired representation.

+14
source share
 object ListSerdeTest extends App { implicit val spark: SparkSession = SparkSession .builder .master("local[2]") .getOrCreate() import spark.implicits._ val myDS = spark.createDataset( Seq( MyCaseClass(mylist = Array(("asd", "aa"), ("dd", "ee"))) ) ) myDS.toDF().printSchema() myDS.toDF().foreach( row => { row.getSeq[Row](row.fieldIndex("mylist")) .foreach { case Row(a, b) => println(a, b) } } ) } case class MyCaseClass ( mylist: Seq[(String, String)] ) 

The above code is another way to work with a nested structure. By default, Spark Encoder will encode TupleX, making them nested structures, which is why you see this strange behavior. and as others said in the comment, you cannot just execute getAs[T]() since it is just a cast ( x.asInstanceOf[T] ), so you will get runtime exceptions.

0
source share

All Articles