How to get a histogram of all columns in a large CSV / RDD [Array [double]] using Apache Spark Scala?

I am trying to calculate a histogram of all columns from a CSV file using Spark Scala.

I found that DoubleRDDFunctions supports a histogram. So I encoded as the following to get a histogram of all the columns.

  • Get the number of columns
  • Create an RDD[double] each column and calculate the histogram of each RDD using DoubleRDDFunctions

     var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1) val histogramData = columnIndexArray.map(columns => { rdd.map(lines => lines(columns)).histogram(6) }) 

Is this a good way? Can anyone suggest some better ways to solve this?

Thanks in advance.

+7
scala csv histogram apache-spark rdd
source share
2 answers

Not better, but an alternative way is to convert RDD to DataFrame and use histogram_numeric UDF.

Sample data:

 import scala.util.Random import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.{callUDF, lit, col} import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext val sqlContext = new HiveContext(sc) Random.setSeed(1) val ncol = 5 val rdd = sc.parallelize((1 to 1000).map( _ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble)) )) val schema = StructType( (1 to ncol).map(i => StructField(s"x$i", DoubleType, false))) val df = sqlContext.createDataFrame(rdd, schema) df.registerTempTable("df") 

Query:

 val nBuckets = 3 val columns = df.columns.map( c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c)) val histograms = df.select(columns: _*) histograms.printSchema // root // |-- x1: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x2: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x3: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x4: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) // |-- x5: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- x: double (nullable = true) // | | |-- y: double (nullable = true) histograms.select($"x1").collect() // Array([WrappedArray([0.16874313309969038,334.0], // [0.513382068667877,345.0], [0.8421388886903808,321.0])]) 
+5
source share

conversion (scala api), countByValue should do what you want

to, for example, generate histogram data for the first column in your RDD:

 val histCol1 = RDD.map(record => record.col_1).countByValue() 

in the above expression, the record simply refers to a data row in RDD, an instance of the case class, which has a field col_1

and therefore histCol1 will return a hash table (scala Map), in which the keys are unique values ​​in column 1 (col_1), and the values ​​are obviously the values ​​of each unique value

+1
source share

All Articles