none of them can take advantage of Catalyst and Tungsten optimization
This is not entirely true. While UDFs do not use tungsten optimization (perhaps a simple SQL conversion does not get much increase there), you can still use the execution plan optimizations provided by Catalyst. Let's illustrate this with a simple example (Note: Spark 2.0 and Scala. Do not extrapolate this to earlier versions, especially with PySpark):
val f = udf((x: String) => x == "a") val g = udf((x: Int) => x + 1) val df = Seq(("a", 1), ("b", 2)).toDF df .groupBy($"_1") .agg(sum($"_2").as("_2")) .where(f($"_1")) .withColumn("_2", g($"_2")) .select($"_1") .explain // == Physical Plan == // *HashAggregate(keys=[_1
The execution plan shows us a couple of things:
- Selection was pressed before aggregation.
- Projection was clicked before aggregation and effectively removed the second UDF call.
Depending on the data and the pipeline, this can provide a significant increase in performance almost for free.
At the same time, both RDD and UDF require migration between safe and unsafe, and the latter are much less flexible. However, if the only thing you need is a simple map behavior that does not initialize expensive objects (like database connections), then UDF is the way to go.
In slightly more complex scenarios, you can easily go over to common Dataset and reserve RDDs for cases when you really need access to some low-level functions, for example, user partitioning.
source share