Word count

We are trying to generate the column statistics of our dataset in spark mode. In addition to using the resulting function from the statistics library. We use the following procedure:

  • We define columns with string values

  • Create a pair of key values ​​for the entire data set, using the column number as the key and the column value as the value

  • create a new format map

    (K, V) → ((K, V), 1)

Then we use reduceByKey to find the sum of all unique values ​​in all columns. We cache this output to reduce computation time.

In the next step, we cycle through the columns using the for loop to find statistics for all columns.

We are trying to reduce the for loop by using the map reduction path again, but we cannot find any way to achieve it. This will allow us to generate column statistics for all columns in a single execution. The for loop method works sequentially, making it very slow.

The code:

//drops the header def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) => { if (idx == 0) { lines.drop(1) } lines }) } def retAtrTuple(x: String) = { val newX = x.split(",") for (h <- 0 until newX.length) yield (h,newX(h)) } val line = sc.textFile("hdfs://.../myfile.csv") val withoutHeader: RDD[String] = dropHeader(line) val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column value var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey() //this contains column indexes as key and boolean as value (true for numeric and false for string type) var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x} var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x} var str_col = str_cols.toArray //array consisting the string col var num_col = num_cols.toArray //array consisting numeric col val colCount = kvPairs.map((_,1)).reduceByKey(_+_) val e1 = colCount.map{case ((x,y),z) => (x,(y,z))} var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) } //running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise for(i <- str_col){ var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2) var leastOnes = total.take(10) println("leastOnes for Col" + i) leastOnes.foreach(println) var maxOnes = total.sortBy(-_._2._2).take(10) println("maxOnes for Col" + i) maxOnes.foreach(println) println("distinct for Col" + i + " is " + total.count) } 
+3
scala apache-spark summary
source share
2 answers

Let me simplify your question a bit. (Actually a lot.) We have RDD[(Int, String)] , and we want to find the 10 most common String for each Int (all of which are in the range 0 - 100).

Instead of sorting, as in your example, it is more efficient to use the built-in RDD.top(n) Spark method. Its runtime is linear in data size and requires much less data movement than sorting.

Consider the top implementation in RDD.scala . You want to do the same, but with one priority queue (heap) on Int . The code is getting pretty complicated:

 import org.apache.spark.util.BoundedPriorityQueue // Pretend it not private. def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = { // A heap that only keeps the top N values, so it has bounded size. type Heap = BoundedPriorityQueue[(Long, String)] // Get the word counts. val counts: RDD[[(Int, String), Long)] = rdd.map(_ -> 1L).reduceByKey(_ + _) // In each partition create a column -> heap map. val perPartition: RDD[Map[Int, Heap]] = counts.mapPartitions { items => val heaps = collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n)) for (((k, v), count) <- items) { heaps(k) += count -> v } Iterator.single(heaps) } // Merge the per-partition heap maps into one. val merged: Map[Int, Heap] = perPartition.reduce { (heaps1, heaps2) => val heaps = collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n)) for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) { for (cv <- heap) { heaps(k) += cv } } heaps } // Discard counts, return just the top strings. merged.mapValues(_.map { case(count, value) => value }) } 

This is effective, but painful, because we need to work with multiple columns at the same time. It would be easier to have one RDD per column and just call rdd.top(10) for each.

Unfortunately, the naive way to split an RDD into N smaller RDDs is to make N passes:

 def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = { together.cache // We will make N passes over this RDD. (0 until columns).map { i => together.filter { case (key, value) => key == i }.values } } 

A more efficient solution would be to write data to separate files by key, and then load them back into separate RDDs. This is discussed in Writing to Multiple Outputs Using the Spark Key - One Spark Job .

+2
source share

Thanks for the answer @ Daniel Darabos. But there are some errors.

  • mixed use of Map and collection.mutable.Map

  • withDefault ((i: Int) => new Heap (n)) does not create a new heap when you set heaps (k) + = count → v

  • mix parenthesis patterns

Here is the modified code:

 //import org.apache.spark.util.BoundedPriorityQueue // Pretend it not private. copy to your own folder and import it import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object BoundedPriorityQueueTest { // https://stackoverflow.com/questions/28166190/spark-column-wise-word-count def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = { // A heap that only keeps the top N values, so it has bounded size. type Heap = BoundedPriorityQueue[(Long, String)] // Get the word counts. val counts: RDD[((Int, String), Long)] = rdd.map(_ -> 1L).reduceByKey(_ + _) // In each partition create a column -> heap map. val perPartition: RDD[collection.mutable.Map[Int, Heap]] = counts.mapPartitions { items => val heaps = collection.mutable.Map[Int, Heap]() // .withDefault((i: Int) => new Heap(n)) for (((k, v), count) <- items) { println("\n---") println("before add " + ((k, v), count) + ", the map is: ") println(heaps) if (!heaps.contains(k)) { println("not contains key " + k) heaps(k) = new Heap(n) println(heaps) } heaps(k) += count -> v println("after add " + ((k, v), count) + ", the map is: ") println(heaps) } println(heaps) Iterator.single(heaps) } // Merge the per-partition heap maps into one. val merged: collection.mutable.Map[Int, Heap] = perPartition.reduce { (heaps1, heaps2) => val heaps = collection.mutable.Map[Int, Heap]() //.withDefault((i: Int) => new Heap(n)) println(heaps) for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) { for (cv <- heap) { heaps(k) += cv } } heaps } // Discard counts, return just the top strings. merged.mapValues(_.map { case (count, value) => value }).toMap } def main(args: Array[String]): Unit = { Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console val conf = new SparkConf().setAppName("word count").setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console val words = sc.parallelize(List((1, "s11"), (1, "s11"), (1, "s12"), (1, "s13"), (2, "s21"), (2, "s22"), (2, "s22"), (2, "s23"))) println("# words:" + words.count()) val result = top(1, words) println("\n--result:") println(result) sc.stop() print("DONE") } } 
0
source share

All Articles