How to compare multiple lines?

I would like to compare two consecutive lines of i with i-1 of col2 (sorted by col1 ).

If item_i in the i th row and item_[i-1]_row are different, I would like to increase the counter item_[i-1] by 1.

 +--------------+ | col1 col2 | +--------------+ | row_1 item_1 | | row_2 item_1 | | row_3 item_2 | | row_4 item_1 | | row_5 item_2 | | row_6 item_1 | +--------------+ 

In the above example, if we scan two rows at a time down, we see that row_2 and row_3 are different from each other, so we add them to item_1. Then we see that row_3 is different from row_4 , and then adds it to item_2 . Continue until you are done:

 +-------------+ | col2 col3 | +-------------+ | item_1 2 | | item_2 2 | +-------------+ 
+5
source share
1 answer

You can use a combination of a window function and a combination for this. The window function is used to get the next col2 value (using col1 to order). Then the unit calculates the time during which we are faced with the differences. This is implemented in the code below:

 val data = Seq( ("row_1", "item_1"), ("row_2", "item_1"), ("row_3", "item_2"), ("row_4", "item_1"), ("row_5", "item_2"), ("row_6", "item_1")).toDF("col1", "col2") import org.apache.spark.sql.expressions.Window val q = data. withColumn("col2_next", coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")). groupBy($"col2"). agg(sum($"col2" =!= $"col2_next" cast "int") as "col3") scala> q.show 17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. +------+----+ | col2|col3| +------+----+ |item_1| 2| |item_2| 2| +------+----+ 
+8
source

All Articles