GenericRowWithSchema exception when casting ArrayBuffer in HashSet in DataFrame in RDD from Hive table

I have a Hive table in parquet format that was generated using

create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet; 

I can check that it has been filled - here is an approximate value

 [1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])] 

I want to put this in a spark RDD form

 ((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello")))) 

Now, using the spark wrapper (I get the same problem in spark-submit), I did a test RDD with these values

 scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello")))))) tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85 

using an iterator, I can distinguish ArrayBuffer as a HashSet in the following new RDD:

 scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } ))) tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87 scala> tempRDD2.collect.foreach(println) ((1,abcdef),((2,ghijkl),Set((1,hello)))) 

But when I try to EXACTLY the SAME with a DataFrame with a HiveContext / SQLContext, I get the following error:

 scala> val hc = new HiveContext(sc) scala> import hc._ scala> import hc.implicits._ scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""") scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] ))) scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } ))) tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91 scala> tempRDD3.collect.foreach(println) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Note that I get the same error: "GenericRowWithSchema cannot be passed to scala.Tuple2" when I run this in a compiled program using the spark-submit function. The program crashes in RUN TIME when it encounters a conversion step, and I did not have compiler errors.

It seems very strange to me that my artificially created RDD "tempRDD" will work with the conversion, while the Hive DataFrame-> RDD request did not. I checked and both RDDs have the same shape:

 scala> tempRDD org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776 scala> tempRDDfromHive org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70 

The only difference is that their last step arose. I even tried to save, validate, and materialize these RDDs before running the steps for tempRDD2 and tempRDD3. Everyone got the same error message.

I also read related questions about the stack and problems with Apache Spark Jira, and from the ones I tried to use instead of ArrayBuffer as Iterator, but this also failed in the second stage with the same error.

Does anyone know how to properly convert ArrayBuffers to HashSets for DataFrames originating from Hive tables? Since the error seems to be only for the version of the Hive table, I am tempted to think that this is a problem with the integration of Spark / Hive in SparkQL.

Any ideas?

Thanks in advance.

[edited] BTW, my version of Spark is 1.3.0 CDH.

[edited: here are the results of printSchema]

 scala> tempRDDfromHive.printSchema() root |-- var1: integer (nullable = true) |-- var2: string (nullable = true) |-- var3: integer (nullable = true) |-- var4: string (nullable = true) |-- var5: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: integer (nullable = true) | | |-- b: string (nullable = true) 
+3
scala hive apache-spark apache-spark-sql
source share
1 answer

What you actually get during the map phase is not ArrayBuffer[(Int, String)] , but ArrayBuffer[Row] , therefore, an error. Ignoring the other columns, what you need is something like this:

 import org.apache.spark.sql.Row tempHiveQL.map((a: Row) => a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet) 

This seems to be fixed in Spark 1.5.0.

+14
source share

All Articles