How to increase the value and save all columns (for maximum records for each group)?

Given the following DataFrame:

+----+-----+---+-----+ | uid| k| v|count| +----+-----+---+-----+ | a|pref1| b| 168| | a|pref3| h| 168| | a|pref3| t| 63| | a|pref3| k| 84| | a|pref1| e| 84| | a|pref2| z| 105| +----+-----+---+-----+ 

How to get maximum value from uid , k but enable v ?

 +----+-----+---+----------+ | uid| k| v|max(count)| +----+-----+---+----------+ | a|pref1| b| 168| | a|pref3| h| 168| | a|pref2| z| 105| +----+-----+---+----------+ 

I can do something like this, but it will lose the "v" column:

 df.groupBy("uid", "k").max("count") 
+12
apache-spark apache-spark-sql spark-dataframe
source share
3 answers

This is a great example for window statements (using the over function) or join .

Since you already understood how to use windows, I focus solely on join .

 scala> val inventory = Seq( | ("a", "pref1", "b", 168), | ("a", "pref3", "h", 168), | ("a", "pref3", "t", 63)).toDF("uid", "k", "v", "count") inventory: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 2 more fields] scala> val maxCount = inventory.groupBy("uid", "k").max("count") maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] scala> maxCount.show +---+-----+----------+ |uid| k|max(count)| +---+-----+----------+ | a|pref3| 168| | a|pref1| 168| +---+-----+----------+ scala> val maxCount = inventory.groupBy("uid", "k").agg(max("count") as "max") maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] scala> maxCount.show +---+-----+---+ |uid| k|max| +---+-----+---+ | a|pref3|168| | a|pref1|168| +---+-----+---+ scala> maxCount.join(inventory, Seq("uid", "k")).where($"max" === $"count").show +---+-----+---+---+-----+ |uid| k|max| v|count| +---+-----+---+---+-----+ | a|pref3|168| h| 168| | a|pref1|168| b| 168| +---+-----+---+---+-----+ 
+12
source share

Here is the best solution I've come up with so far:

 val w = Window.partitionBy("uid","k").orderBy(col("count").desc) df.withColumn("rank", dense_rank().over(w)).select("uid", "k","v","count").where("rank == 1").show 
+9
source share

You can use window functions:

 from pyspark.sql.functions import max as max_ from pyspark.sql.window import Window w = Window.partitionBy("uid", "k") df.withColumn("max_count", max_("count").over(w)) 
+8
source share

All Articles