Effective RDD API Impact on UDF Mixed with DataFrame API

(Scala is a special issue.)

Although Spark docs recommends using the DataFrame API where possible, if the DataFrame API is insufficient, the choice usually comes from returning to the RDD API or using UDF. Is there a performance difference between the two alternatives?

RDD and UDF are similar in that none of them can benefit from the optimization of Catalyst and Tungsten. Is there any other overhead, and is there a difference between the two approaches?

To give a concrete example, let's say I have a DataFrame that contains a column of text data with custom formatting (not subject to regular regular matching). I need to parse this column and add a new vector column containing the resulting markers.

+2
source share
1 answer

none of them can take advantage of Catalyst and Tungsten optimization

This is not entirely true. While UDFs do not use tungsten optimization (perhaps a simple SQL conversion does not get much increase there), you can still use the execution plan optimizations provided by Catalyst. Let's illustrate this with a simple example (Note: Spark 2.0 and Scala. Do not extrapolate this to earlier versions, especially with PySpark):

val f = udf((x: String) => x == "a") val g = udf((x: Int) => x + 1) val df = Seq(("a", 1), ("b", 2)).toDF df .groupBy($"_1") .agg(sum($"_2").as("_2")) .where(f($"_1")) .withColumn("_2", g($"_2")) .select($"_1") .explain // == Physical Plan == // *HashAggregate(keys=[_1#2], functions=[]) // +- Exchange hashpartitioning(_1#2, 200) // +- *HashAggregate(keys=[_1#2], functions=[]) // +- *Project [_1#2] // +- *Filter UDF(_1#2) // +- LocalTableScan [_1#2, _2#3] 

The execution plan shows us a couple of things:

  • Selection was pressed before aggregation.
  • Projection was clicked before aggregation and effectively removed the second UDF call.

Depending on the data and the pipeline, this can provide a significant increase in performance almost for free.

At the same time, both RDD and UDF require migration between safe and unsafe, and the latter are much less flexible. However, if the only thing you need is a simple map behavior that does not initialize expensive objects (like database connections), then UDF is the way to go.

In slightly more complex scenarios, you can easily go over to common Dataset and reserve RDDs for cases when you really need access to some low-level functions, for example, user partitioning.

+10
source

All Articles