How to sort RDD and limit in Spark?

I have an RDD of class Foo: class Foo( name : String, createDate : Date ) . I want another RDD with 10% older than Foo . My first idea was to sort by createDate and limit to 0.1 * counter, but there is no limit function.

Do you have an idea?

+8
scala apache-spark rdd
source share
1 answer

Assuming Foo is a case class as follows:

 import java.sql.Date case class Foo(name: String, createDate: java.sql.Date) 
  • Using simple RDDs:

     import org.apache.spark.rdd.RDD import scala.math.Ordering val rdd: RDD[Foo] = sc .parallelize(Seq( ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"), ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23"))) .toDF("name", "createDate") .withColumn("createDate", $"createDate".cast("date")) .as[Foo].rdd rdd.cache() val n = scala.math.ceil(0.1 * rdd.count).toInt 
    • data is written into the driver memory:

      • and the desired fraction is relatively small

         rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime)) // Array[Foo] = Array(Foo(a,2009-11-23)) 
      • The share you want is relatively large:

         rdd.sortBy(_.createDate.getTime).take(n) 
    • otherwise

       rdd .sortBy(_.createDate.getTime) .zipWithIndex .filter{case (_, idx) => idx < n} .keys 
  • Using a DataFrame (note - this is actually not optimal performance due to limited behavior).

     import org.apache.spark.sql.Row val topN = rdd.toDF.orderBy($"createDate").limit(n) topN.show // +----+----------+ // |name|createDate| // +----+----------+ // | a|2009-11-23| // +----+----------+ // Optionally recreate RDD[Foo] topN.map{case Row(name: String, date: Date) => Foo(name, date)} 
+14
source share

All Articles