Sparking functions and UDF performance?

Spark now offers predefined functions that can be used in information frames, and it seems that they are highly optimized. My initial question was what would be faster, but I did some testing and found that the ignition functions are about 10 times faster in at least one case. Does anyone know why this is, and when udf will be faster (only for cases where there is an identical spark function)?

Here is my test code (running in the Databricks ed community):

# UDF vs Spark function from faker import Factory from pyspark.sql.functions import lit, concat fake = Factory.create() fake.seed(4321) # Each entry consists of last_name, first_name, ssn, job, and age (at least 1) from pyspark.sql import Row def fake_entry(): name = fake.name().split() return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1) # Create a helper function to call a function repeatedly def repeat(times, func, *args, **kwargs): for _ in xrange(times): yield func(*args, **kwargs) data = list(repeat(500000, fake_entry)) print len(data) data[0] dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age')) dataDF.cache() 

UDF Function:

 concat_s = udf(lambda s: s+ 's') udfData = dataDF.select(concat_s(dataDF.first_name).alias('name')) udfData.count() 

Spark Function:

 spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name')) spfData.count() 

It started several times, udf usually took about 1.1–1.4 s, and the concat Spark function always ran less than 0.15 s.

+30
performance user-defined-functions apache-spark pyspark apache-spark-sql spark-dataframe
source share
2 answers

when UDD will be faster

If you ask about Python UDF, the answer will probably never be *. Because SQL functions are relatively simple and not designed for complex tasks, it is almost impossible to offset the cost of re-serializing, deserializing, and moving data between the Python interpreter and the JVM.

Does anyone know why this is so

The main reasons are already listed above and can be reduced to the simple fact that the Spark DataFrame originally a JVM structure, and standard access methods are implemented by simple Java API calls. UDFs, on the other hand, are implemented in Python and require moving data back and forth.

Although PySpark generally requires moving data between the JVM and Python, in the case of the low-level RDD API, it usually does not require expensive serde activity. Spark SQL adds the additional cost of serialization and serialization, as well as the cost of moving data from an insecure view to the JVM and vice versa. The latter applies to all UDFs (Python, Scala, and Java), but the former applies to non-native languages.

Unlike UDF, Spark SQL functions work directly in the JVM and are usually well integrated with both Catalyst and Tungsten. This means that they can be optimized in terms of execution, and most of the time they can benefit from the optimization of codgen and other tungsten compounds. Moreover, they can work with data in a "native" view.

Thus, in a sense, the problem is that Python UDF needs to wrap data in code, while SQL expressions go the other way around.


* According to rough estimates, UDF windows PySpark can surpass the window function Scala.

+44
source share

Years later, when I gained more knowledge and looked again at this question, I just realized that @alfredox really wants to ask. Therefore, I again reviewed and divided the answer into two parts:


To answer why the native DF function (native Spark-SQL function) is faster:

Essentially, why the built-in Spark function is ALWAYS faster than Spark UDF, regardless of whether your UDF is implemented in Python or Scala.

First, we need to understand what tungsten is , which first appeared in Spark 1.4 .

This is the backend and what it focuses on:

  1. Off-heap memory management using a binary representation of the data in memory, known as the Tungsten string format, and explicit memory management,
  2. Cache locality regarding cache-based calculations with cache-based markup for high cache access rates,
  3. Code generation for the entire stage (aka CodeGen).

One of Spark's biggest productivity killers is the GC. The GC pauses all threads in the JVM until the GC shuts down. That is why off-heap memory management is being implemented.

When performing Spark-SQL native functions, the data remains in the tungsten backend. However, in a Spark UDF script, the data will be moved from tungsten to the JVM (Scala script) or the JVM and Python Process (Python) to execute the actual process, and then rolled back to tungsten. As a result:

  1. Inevitably, there will be overhead / fine for:
    1. Deserialize tungsten input.
    2. Serialize the output back to tungsten.
  2. Even when using Scala, Spark's first-class citizen, this will increase the amount of memory in the JVM, which can lead to an increase in GC in the JVM. This problem is precisely due to the fact that the tungsten function "Managing memory outside the heap" is trying to solve .

To answer whether Python will be slower than Scala:

Since October 30, 2017, Spark introduced vectorized udf files for pyspark.

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

The reason Python UDF is slow is because PySpark UDF is not implemented in the most optimal way:

According to the paragraph from the link.

Spark added the Python API in version 0.7 with support for custom functions. These user-defined functions run one line at a time and therefore suffer from high costs of serialization and calls.

However, the new vectorized udf files seem to significantly improve performance:

from 3x to 100x.

enter image description here

+9
source share

All Articles