Spark DataFrame: does groupBy after orderBy maintain this order?

I have a Spark 2.0 example framework with the following structure:

 id, hour, count id1, 0, 12 id1, 1, 55 .. id1, 23, 44 id2, 0, 12 id2, 1, 89 .. id2, 23, 34 etc. 

It contains 24 entries for each identifier (one for each hour of the day) and is ordered by identifier, hour, using the orderBy function.

I created a groupConcat aggregator:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable { override def zero: String = "" override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat) override def merge(b1: String, b2: String) = b1 + b2 override def finish(b: String) = b.substring(1) override def bufferEncoder: Encoder[String] = Encoders.STRING override def outputEncoder: Encoder[String] = Encoders.STRING }.toColumn 

This helps me concatenate columns into rows to get this final frame:

 id, hourly_count id1, 12:55:..:44 id2, 12:89:..:34 etc. 

My question is: if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count") , this ensures that hourly counts will be properly ordered in the appropriate buckets?

I read that this is not necessary for RDD (see Spark sort by key and then group to get ordered iterable? ), But maybe this is different for DataFrames?

If not, how can I get around this?

+13
scala apache-spark apache-spark-sql spark-dataframe spark-streaming
source share
6 answers

groupBy after orderBy does not maintain order, as others have indicated. What you want to do is use the Window function - id separation and clock order. You can collect from this list, and then take the maximum (largest) of the received lists, since they go cumulatively (i.e. the first hour will contain only itself in the list, the second hour will have 2 elements in the list, etc.) .

Full code example:

 import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window import spark.implicits._ val data = Seq(( "id1", 0, 12), ("id1", 1, 55), ("id1", 23, 44), ("id2", 0, 12), ("id2", 1, 89), ("id2", 23, 34)).toDF("id", "hour", "count") val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")} data.withColumn("collected", collect_list($"count") .over(Window.partitionBy("id") .orderBy("hour"))) .groupBy("id") .agg(max($"collected").as("collected")) .withColumn("hourly_count", mergeList($"collected")) .select("id", "hourly_count").show 

This keeps us in the world of DataFrame. I also simplified the UDF code that used the OP.

Exit:

 +---+------------+ | id|hourly_count| +---+------------+ |id1| 12:55:44| |id2| 12:89:34| +---+------------+ 
+17
source share

I have a case where the order is not always maintained: sometimes yes, basically no.

My DataFrame has 200 partitions running on Spark 1.6

 df_group_sort = data.orderBy(times).groupBy(group_key).agg( F.sort_array(F.collect_list(times)), F.collect_list(times) ) 

to check the order, I compare the return values

 F.sort_array(F.collect_list(times)) 

and

 F.collect_list(times) 
<p> (left: sort_array (collect_list ()); right: collect_list ())
 2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000 2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000 2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000 2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000 2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000 2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000 2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000 2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000 2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000 2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000 2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000 2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000 2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000 2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000 

The left column is always sorted, and the right column consists only of sorted blocks. For different executions of take (), the order of the blocks in the right column is different.

+5
source share

If you want to get around the implementation in Java (Scala and Python should be similar):

 example.orderBy("hour") .groupBy("id") .agg(functions.sort_array( functions.collect_list( functions.struct(dataRow.col("hour"), dataRow.col("count"))),false) .as("hourly_count")); 
+4
source share

may or may not be the same, depending on the number of partitions and data distribution. We can solve using rdd itself.

For example:

I saved the sample data below in a file and uploaded it to hdfs.

 1,type1,300 2,type1,100 3,type2,400 4,type2,500 5,type1,400 6,type3,560 7,type2,200 8,type3,800 

and run the following command:

 sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect() 

exit:

 Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4)) 

That is, we grouped the data by type, then sorted by price and combined the identifiers with the symbol "~" as a separator. The above command can be broken as shown below:

 val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3) val groupedData=validData.groupBy(_(1)) //group data rdds val sortedJoinedData=groupedData.mapValues(x=>{ val list=x.toList val sortedList=list.sortBy(_(2)) val idOnlyList=sortedList.map(_(0)) idOnlyList.mkString("~") } ) sortedJoinedData.collect() 

we can take a specific group using the command

 sortedJoinedData.filter(_._1=="type1").collect() 

exit:

 Array[(String, String)] = Array((type1,2~1~5)) 
+1
source share

No, sorting inside groupByKey will not necessarily be supported, but, as you know, it is difficult to reproduce it in memory on one node. As mentioned earlier, the most typical way this happens is when things need to be reallocated for groupByKey to groupByKey . I was able to reproduce this by manually doing repartition after sort . Then I passed the results to groupByKey .

 case class Numbered(num:Int, group:Int, otherData:Int) // configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number val v = (1 to 100000) // Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (ie so it won't be monotonic, not sure if needed) .map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS() // Be sure they are stored in a small number of partitions .repartition(2) .sort($"num") // Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order. .repartition(200) .groupByKey(_.group) .mapGroups { case (g, nums) => nums // all you need is .sortBy(_.num) here to fix the problem .map(_.num) .mkString("~") } .collect() // Walk through the concatenated strings. If any number ahead // is smaller than the number before it, you know that something // is out of order. v.zipWithIndex.map { case (r, i) => r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) => if (next < prev) { println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***") } next } } 
0
source share

Short answer: Yes, hourly counting will maintain the same order.

To summarize, it is important that you sort before grouping. Also the sorting should be the same as the group + column for which you really want sorting.

An example would look like this:

 employees .sort("company_id", "department_id", "employee_role") .groupBy("company_id", "department_id") .agg(Aggregators.groupConcat(":", 2) as "count_per_role") 
-5
source share

All Articles