Performance of loading parquet files into class classes in Spark

I appreciate the performance of the various methods for loading Parquet files into Spark, and the differences are stunning.

In our Parquet files, we have nested case classes like:

case class C(/* a dozen of attributes*/) case class B(/* a dozen of attributes*/, cs: Seq[C]) case class A(/* a dozen of attributes*/, bs: Seq[B]) 

It takes some time to load them from the Parquet files. So I did a test of various ways to load class classes from Parquet files and summarize fields using Spark 1.6 and 2.0.

Here is a brief description of the test I did:

 val df: DataFrame = sqlContext.read.parquet("path/to/file.gz.parquet").persist() df.count() // Spark 1.6 // Play Json // 63.169s df.toJSON.flatMap(s => Try(Json.parse(s).as[A]).toOption) .map(_.fieldToSum).sum() // Direct access to field using Spark Row // 2.811s df.map(row => row.getAs[Long]("fieldToSum")).sum() // Some small library we developed that access fields using Spark Row // 10.401s df.toRDD[A].map(_.fieldToSum).sum() // Dataframe hybrid SQL API // 0.239s df.agg(sum("fieldToSum")).collect().head.getAs[Long](0) // Dataset with RDD-style code // 34.223s df.as[A].map(_.fieldToSum).reduce(_ + _) // Dataset with column selection // 0.176s df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _) // Spark 2.0 // Performance is similar except for: // Direct access to field using Spark Row // 23.168s df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _) // Some small library we developed that access fields using Spark Row // 32.898s f1DF.toRDD[A].map(_.fieldToSum).sum() 

I understand why the performance of methods using Spark Row deteriorates when upgrading to Spark 2.0, because the Dataframe now a simple alias for Dataset[Row] . I believe that the cost of unifying interfaces.

On the other hand, I am very disappointed that the Dataset promise is not kept: the performance when using RDD-style coding ( map and flatMap s) is worse than when using Dataset like Dataframe with SQL-like DSL.

Basically, in order to have good performance, we need to give up type safety.

  • What is the reason for the difference between Dataset used as RDD and Dataset used as Dataframe ?

  • Is there a way to improve coding performance in Dataset to match RDD-style coding with SQL-style coding? For data engineering, it's much cleaner to have RDD-style coding.

  • In addition, to work with SQL-like DSL, you need to smooth out our data model and not use nested class classes. Is it right that good performance is only achieved using flat data models?

+6
source share
1 answer

What is the reason for the difference between the dataset used as RDD and Dataset used as a Dataframe?

To get some ideas, consider the optimization used by Spark SQL. As far as I know, there are three types of improvements over regular RDD :

  • optimization of the execution plan (protrusions and selective pressing, constant folding),
  • off-heap memory usage and efficient column caching format,
  • code generation.

Now the problem is that not all of these methods are useful outside of limited programming models such as SQL.

For example, it is possible to drop the selection (filters), but the forecasts are quite limited (you cannot have a part of an object, can you?). Similarly, code generation depends on well-defined semantics and is not easy to apply at all (it is basically a compiler that generates code that can be optimized by the JVM).

Finally, sun.misc.Unsafe is a great way to improve performance, but it doesn't come for free. Although there are many benefits, there is also significant overhead in coding and decoding.

working with SQL-like DSL will require smoothing our data model and not using nested class classes.

The nested structures are not exactly first class citizens, and there are some poorly documented restrictions that you can still do quite a lot here.

the performance of methods using Spark Row deteriorates when upgrading to Spark 2.0, since the Dataframe is now a simple alias of Dataset [Row]. I believe that the cost of unifying interfaces.

Despite some regression in performance, these two pieces of code are simply not equivalent. There is a different signature in 2.0.0+ DataFrame.map than 1.x. If you want to make these two comparable, you must first convert to RDD :

 df.rdd.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _) 
+2
source

All Articles