Spark Streaming Word Count

This is a sparking program written in scala. It counts the number of words from the socket every 1 second. The result will be the number of words, for example, the number of words from time 0 to 1, and the number of words from time 1 to 2. But I wonder if it is possible to somehow change this program so that we can accumulate the number of words? That is, the word is counted from time 0 until now.

val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() 
+8
scala distributed apache-spark spark-streaming
source share
1 answer

You can use StateDStream . There is an example of the number of words given the state of the spark examples .

 object StatefulNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() ssc.start() ssc.awaitTermination() } } 

How it works, you get Seq[T] for each batch, and then update Option[T] , which acts like a battery. The reason this is Option is that in the first installment it will be None and will remain that way if it is not updated. In this example, the counter is int, if you are dealing with a lot of data, you can even have Long or BigInt

+9
source share

All Articles