Sparks: scala.MatchError (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

The scala.MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) exception scala.MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) exception occurs when I try to access the elements of a DataFrame string. The following code counts book pairs, where the number of pairs is equal to the number of readers who read this pair of books.

Interestingly, an exception only occurs when trainPairs is created as a result of trainDf.join(...) . If the same data structure is created as a string:

 case class BookPair (book1:Int, book2:Int, cnt:Int, name1: String, name2: String) val recs = Array( BookPair(1, 2, 3, "book1", "book2"), BookPair(2, 3, 1, "book2", "book3"), BookPair(1, 3, 2, "book1", "book3"), BookPair(1, 4, 5, "book1", "book4"), BookPair(2, 4, 7, "book2", "book4") ) 

This exception does not occur at all!

The full code that throws this exception is:

 import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, DataFrame} import org.apache.spark.sql.functions._ object Scratch { case class Book(book: Int, reader: Int, name:String) val recs = Array( Book(book = 1, reader = 30, name = "book1"), Book(book = 2, reader = 10, name = "book2"), Book(book = 3, reader = 20, name = "book3"), Book(book = 1, reader = 20, name = "book1"), Book(book = 1, reader = 10, name = "book1"), Book(book = 1, reader = 40, name = "book1"), Book(book = 2, reader = 40, name = "book2"), Book(book = 1, reader = 100, name = "book1"), Book(book = 2, reader = 100, name = "book2"), Book(book = 3, reader = 100, name = "book3"), Book(book = 4, reader = 100, name = "book4"), Book(book = 5, reader = 100, name = "book5"), Book(book = 4, reader = 500, name = "book4"), Book(book = 1, reader = 510, name = "book1"), Book(book = 2, reader = 30, name = "book2")) def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) // set up environment val conf = new SparkConf() .setMaster("local[5]") .setAppName("Scratch") .set("spark.executor.memory", "2g") val sc = new SparkContext(conf) val data = sc.parallelize(recs) /** * Remove readers with many books count books by reader and filter readers with books count > 10 */ val maxBookCnt = 4 val readersWithLotsOfBooksRDD = data.map(r => (r.reader, 1)).reduceByKey((x, y) => x + y).filter{ case (_, x) => x > maxBookCnt } readersWithLotsOfBooksRDD.collect() val readersWithBooksRDD = data.map( r => (r.reader, (r.book, r.name) )) readersWithBooksRDD.collect() println("*** Records left after removing readers with maxBookCnt > "+maxBookCnt) val data2 = readersWithBooksRDD.subtractByKey(readersWithLotsOfBooksRDD) data2.foreach(println) // *** Prepair train data val trainData = data2.map(tuple => tuple match { case (reader,v) => Book(reader = reader, book = v._1, name = v._2) }) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val trainDf = trainData.toDF() println("*** Creating pairs...") val trainPairs = trainDf.join( trainDf.select($"book" as "r_book", $"reader" as "r_reader", $"name" as "r_name"), $"reader" === $"r_reader" and $"book" < $"r_book") .groupBy($"book", $"r_book", $"name", $"r_name") .agg($"book",$"r_book", count($"reader") as "cnt", $"name", $"r_name") trainPairs.registerTempTable("trainPairs") println("*** Pairs Schema:") trainPairs.printSchema() // Order pairs by count val pairsSorted = sqlContext.sql("SELECT * FROM trainPairs ORDER BY cnt DESC") println("*** Pairs Sorted by Count") pairsSorted.show // Key pairs by book val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Int, name1: String, name2:String) => (book1,(book2, count, name1, name2))}) println("*** keyedPairs:") keyedPairs.foreach(println) } } 

Any ideas?

Update

zero323 writes:

"It throws an exception because the trainPairs schema does not match the pattern you provided. The schema looks like this:

 root |-- book: integer (nullable = false) |-- r_book: integer (nullable = false) |-- name: string (nullable = true) |-- r_name: string (nullable = true) |-- book: integer (nullable = false) |-- r_book: integer (nullable = false) |-- cnt: long (nullable = false) |-- name: string (nullable = true) |-- r_name: string (nullable = true) 

Ok, but how can I find the complete trainPairs scheme? Why then when I print the trainPairs schema with the command:

 trainPairs.printSchema() 

I get only part of this circuit:

 root |-- book: integer (nullable = false) |-- r_book: integer (nullable = false) |-- cnt: long (nullable = false) |-- name: string (nullable = true) |-- r_name: string (nullable = true) 

How to print / find the full trainPairs diagram?

Besides

 Row(Int, Int, String, String, Int, Int, Long, String, String) 

leads to the same scala.MatchError !

+4
source share
1 answer

As I found out, the exception was caused by the wrong field type of the count line. It should be Long , not Int . Therefore, instead of:

 // Key pairs by book val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Int, name1: String, name2:String) => (book1,(book2, count, name1, name2))}) 

The correct code should be:

 val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Long, name1: String, name2:String) => (book1,(book2, count, name1, name2))}) 

And everything will work as expected.

+10
source

All Articles