Convert Pandas-style grouped data to PySpark DataFrame

If we have a Pandas data frame consisting of a category column and a column of values, we can remove the average value in each category by doing the following:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g)) 

As far as I understand, Spark data frames do not directly offer this grouping / conversion operation (I use PySpark in Spark 1.5.0). So what is the best way to implement this calculation?

I tried using grouping / join as follows:

 df2 = df.groupBy("Category").mean("Values") df3 = df2.join(df) 

But this is very slow, since, as I understand it, each category requires a full scan of the DataFrame.

I think (but have not verified) that I can speed this up significantly if I collect the result of group-by / mean into a dictionary and then use this dictionary in UDF as follows:

 nameToMean = {...} f = lambda category, value: value - nameToMean[category] categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType()) df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value)) 

Is there an idiomatic way to express this type of operation without sacrificing performance?

+8
python pandas apache-spark pyspark apache-spark-sql pyspark-sql
source share
2 answers

As I understand it, a full DataFrame scan is required for each category.

No no. Aggregation of a DataFrame is performed using logic similar to aggregateByKey . See Behavior / Optimization in a DataFrame groupBy . The slower part is the join , which requires sorting / shuffling. But it still does not require scanning for each group.

If this is the exact code you are using, it is slow because you are not providing a union expression. Because of this, he simply performs a Cartesian product. Thus, this is not only ineffective, but also false. You want something like this:

 from pyspark.sql.functions import col means = df.groupBy("Category").mean("Values").alias("means") df.alias("df").join(means, col("df.Category") == col("means.Category")) 

I think (but did not check) that I can speed this up significantly if I collect the result of the word group-by / mean in a dictionary and then use this dictionary in UDF

Perhaps, although performance will vary from case to case. The problem with using Python UDF is that it needs to move data in Python and vice versa. However, it is definitely worth a try. You should use the broadcast variable for nameToMean , though.

Is there an idiomatic way to express this type of operation without sacrificing performance?

In PySpark 1.6, you can use the broadcast function:

 df.alias("df").join( broadcast(means), col("df.Category") == col("means.Category")) 

but it is not available at <= 1.5.

+6
source share

In fact, there is an idiomatic way to do this in Spark using the Hive OVER expression.

i.e.

 df.registerTempTable('df') with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df') 

Under the hood, a window function is used. I'm not sure if this is faster than your decision, though

+1
source share

All Articles