Not better, but an alternative way is to convert RDD to DataFrame and use histogram_numeric UDF.
Sample data:
import scala.util.Random import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.{callUDF, lit, col} import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext val sqlContext = new HiveContext(sc) Random.setSeed(1) val ncol = 5 val rdd = sc.parallelize((1 to 1000).map( _ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble)) )) val schema = StructType( (1 to ncol).map(i => StructField(s"x$i", DoubleType, false))) val df = sqlContext.createDataFrame(rdd, schema) df.registerTempTable("df")
Query:
val nBuckets = 3 val columns = df.columns.map( c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c)) val histograms = df.select(columns: _*) histograms.printSchema // root // |-- x1: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x2: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x3: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x4: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x5: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) histograms.select($"x1").collect() // Array([WrappedArray([0.16874313309969038,334.0], // [0.513382068667877,345.0], [0.8421388886903808,321.0])])
zero323
source share