Spark Streaming: StreamingContext does not read data files

I am new to Spark Streaming and I am trying to start with it using Spark-shell. Assuming I have a directory called "dataTest" placed in the root directory of spark-1.2.0-bin-hadoop2.4.

Simple code that I want to test in the shell (after entering $. \ Bin \ spark-shell):

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(2))
val data = ssc.textFileStream("dataTest")
println("Nb lines is equal to= "+data.count())
data.foreachRDD { (rdd, time) => println(rdd.count()) }
ssc.start()
ssc.awaitTermination()

Then I copy some files to the "dataTest" directory (and also tried to rename some existing files in this directory).

But, unfortunately, I did not get what I want (i.e. I did not get any bends, so it seems that ssc.textFileStream is not working correctly), just some things like:

15/01/15 19:32:46 INFO JobScheduler: Added jobs for time 1421346766000 ms
15/01/15 19:32:46 INFO JobScheduler: Starting job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO SparkContext: Starting job: foreachRDD at <console>:20
15/01/15 19:32:46 INFO DAGScheduler: Job 69 finished: foreachRDD at <console>:20
, took 0,000021 s
0
15/01/15 19:32:46 INFO JobScheduler: Finished job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO MappedRDD: Removing RDD 137 from persistence list
15/01/15 19:32:46 INFO JobScheduler: Total delay: 0,005 s for time 1421346766000
ms (execution: 0,002 s)
15/01/15 19:32:46 INFO BlockManager: Removing RDD 137
15/01/15 19:32:46 INFO UnionRDD: Removing RDD 78 from persistence list
15/01/15 19:32:46 INFO BlockManager: Removing RDD 78
15/01/15 19:32:46 INFO FileInputDStream: Cleared 1 old files that were older tha
n 1421346706000 ms: 1421346704000 ms
15/01/15 19:32:46 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
+4
source share
7 answers

? , , , , ( , ).

" "

+2

, , , . , .

+1

, , , , , Spark Streaming , .

( ), .

data.count() , DStream, , , foreachRDD().

0

( Windows 8), , "dataTest", "bin". , ?

0

, . (HDFS), :

15/07/23 10:46:36 INFO dstream.FileInputDStream: Finding new files took 9 ms
15/07/23 10:46:36 INFO dstream.FileInputDStream: New files at time 1437619596000 ms:
hdfs://master:9000/user/jared/input/hadoop-env.sh
15/07/23 10:46:36 INFO storage.MemoryStore: ensureFreeSpace(235504) called with curMem=0, maxMem=280248975
......
15/07/23 10:46:36 INFO input.FileInputFormat: Total input paths to process : 1
15/07/23 10:46:37 INFO rdd.NewHadoopRDD: Input split: hdfs://master:9000/user/jared/input/hadoop-env.sh:0+4387
15/07/23 10:46:42 INFO dstream.FileInputDStream: Finding new files took 107 ms
15/07/23 10:46:42 INFO dstream.FileInputDStream: New files at time 1437619598000 ms:

15/07/23 10:46:42 INFO scheduler.JobScheduler: Added jobs for time 1437619598000 ms
15/07/23 10:46:42 INFO dstream.FileInputDStream: Finding new files took 23 ms
15/07/23 10:46:42 INFO dstream.FileInputDStream: New files at time 1437619600000 ms:

15/07/23 10:46:42 INFO scheduler.JobScheduler: Added jobs for time 1437619600000 ms
15/07/23 10:46:43 INFO dstream.FileInputDStream: Finding new files took 42 ms
15/07/23 10:46:43 INFO dstream.FileInputDStream: New files at time 1437619602000 ms:
15/07/23 10:46:43 INFO scheduler.JobScheduler: Added jobs for time 1437619602000 ms
15/07/23 10:46:43 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1830 bytes result sent to driver
15/07/23 10:46:43 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6098 ms on localhost (1/1)
15/07/23 10:46:43 INFO scheduler.DAGScheduler: ResultStage 0 (foreachRDD at <console>:29) finished in 6.178 s
15/07/23 10:46:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/07/23 10:46:43 INFO scheduler.DAGScheduler: Job 66 finished: foreachRDD at <console>:29, took 6.647137 s
101
0

class StreamingData extends Serializable {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
    //val sc = new SparkContext(conf)
    val ssc = new StreamingContext(conf, Seconds(1))
    val input = ssc.textFileStream("file:///C:/Users/M1026352/Desktop/Spark/StreamInput")
    val lines = input.flatMap(_.split(" "))
    val words = lines.map(word => (word, 1))
    val counts = words.reduceByKey(_ + _)
    counts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

Unix ++, - > - > - > Unix/OSX , Scala. fooobar.com/questions/1571365/...

. .

0

Copying a file / document Using the command line or saving as a file / document in a directory for me. When you usually copy (by IDE), it cannot affect the changed date as the changed date of the stream change.

0
source

All Articles