This is a great example for window statements (using the over function) or join .
Since you already understood how to use windows, I focus solely on join .
scala> val inventory = Seq( | ("a", "pref1", "b", 168), | ("a", "pref3", "h", 168), | ("a", "pref3", "t", 63)).toDF("uid", "k", "v", "count") inventory: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 2 more fields] scala> val maxCount = inventory.groupBy("uid", "k").max("count") maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] scala> maxCount.show +---+-----+----------+ |uid| k|max(count)| +---+-----+----------+ | a|pref3| 168| | a|pref1| 168| +---+-----+----------+ scala> val maxCount = inventory.groupBy("uid", "k").agg(max("count") as "max") maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] scala> maxCount.show +---+-----+---+ |uid| k|max| +---+-----+---+ | a|pref3|168| | a|pref1|168| +---+-----+---+ scala> maxCount.join(inventory, Seq("uid", "k")).where($"max" === $"count").show +---+-----+---+---+-----+ |uid| k|max| v|count| +---+-----+---+---+-----+ | a|pref3|168| h| 168| | a|pref1|168| b| 168| +---+-----+---+---+-----+
Jacek laskowski
source share