Word count per document in Spark

I am learning Spark (in Scala) and trying to figure out how to count all the words in each line of a file. I work with a data set, where each line contains a tab-delimited document_id and the full text of the document.

doc_1   <full-text>
doc_2   <full-text>
etc..

Here is an example of a toy that I have in the doc.txt file

doc_1   new york city new york state
doc_2   rain rain go away

I think what I need to do is convert to includeig tuples

((doc_id, word), 1)

and then call reduceByKey () to sum 1. I wrote the following:

val file = sc.textFile("docs.txt")
val tuples = file.map(_.split("\t"))
            .map( x => (x(1).split("\\s+")
            .map(y => ((x(0), y), 1 ))   ) )

Which gives me an intermediate view, I think I need:

tuples.collect

res0: Array[Array[((String, String), Int)]] = Array(Array(((doc_1,new),1), ((doc_1,york),1), ((doc_1,city),1), ((doc_1,new),1), ((doc_1,york),1), ((doc_1,state),1)), Array(((doc_2,rain),1), ((doc_2,rain),1), ((doc_2,go),1), ((doc_2,away),1)))

But if calling reduceByKey in tuples causes an error

tuples.reduceByKey(_ + )
<console>:21: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[Array[((String, String), Int)]]
              tuples.reduceByKey(_ + )

, . , . , , , . / .

. , https://spark.apache.org/examples.html, , , . , .

+4
2

reduceByKey RDD[(K,V)], , split map, RDD[Array[...]], , . , ... , , ( flatMap):

//Dummy data load
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))  

//Split the data on tabs to get an array of (key, line) tuples
val firstPass = file.map(_.split("\t"))

//Split the line inside each tuple so you now have an array of (key, Array(...)) 
//Where the inner array is full of (word, 1) tuples
val secondPass = firstPass.map(x=>(x(0), x(1).split("\\s+").map(y=>(y,1)))) 

//Now group the words and re-map so that the inner tuple is the wordcount
val finalPass = secondPass.map(x=>(x._1, x._2.groupBy(_._1).map(y=>(y._1,y._2.size))))

, vvvv:

, Tuple2 , flatMap :

//Load your data
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Turn the data into a key-value RDD (I suggest caching the split, kept 1 line for SO)
val firstPass = file.map(x=>(x.split("\t")(0), x.split("\t")(1)))
//Change your key to be a Tuple2[String,String] and the value is the count
val tuples = firstPass.flatMap(x=>x._2.split("\\s+").map(y=>((x._1, y), 1)))
+3

.

scala> val file = sc.textFile("../README.md")
15/02/02 00:32:38 INFO MemoryStore: ensureFreeSpace(32792) called with curMem=45512, maxMem=278302556
15/02/02 00:32:38 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 32.0 KB, free 265.3 MB)
file: org.apache.spark.rdd.RDD[String] = ../README.md MappedRDD[7] at textFile at <console>:12

scala> val splitLines = file.map{ line => line.split(" ") } 
splitLines: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[9] at map at <console>:14

scala> splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
res19: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[10] at map at <console>:17

scala> val result = splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
result: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[11] at map at <console>:16

scala> result.take(10).foreach(println)

Map(# -> 1, Spark -> 1, Apache -> 1)
Map( -> 1)
Map(for -> 1, is -> 1, Data. -> 1, system -> 1, a -> 1, provides -> 1, computing -> 1, cluster -> 1, general -> 1, Spark -> 1, It -> 1, fast -> 1, Big -> 1, and -> 1)
Map(in -> 1, Scala, -> 1, optimized -> 1, APIs -> 1, that -> 1, Java, -> 1, high-level -> 1, an -> 1, Python, -> 1, and -> 2, engine -> 1)
Map(for -> 1, data -> 1, a -> 1, also -> 1, general -> 1, supports -> 2, It -> 1, graphs -> 1, analysis. -> 1, computation -> 1)
Map(for -> 1, set -> 1, tools -> 1, rich -> 1, Spark -> 1, structured -> 1, including -> 1, of -> 1, and -> 1, higher-level -> 1, SQL -> 2)
Map(GraphX -> 1, for -> 2, processing, -> 2, data -> 1, MLlib -> 1, learning, -> 1, machine -> 1, graph -> 1)
Map(for -> 1, Streaming -> 1, processing. -> 1, stream -> 1, Spark -> 1, and -> 1)
Map( -> 1)
Map(<http://spark.apache.org/> -> 1)
0

All Articles