Performing individual streaming requests in spark structured streaming

I am trying to combine a stream with two different windows and print it in the console. However, only the first streaming request is printed. tenSecsQnot printed in the console.

SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredNetworkWordCountWindowed")
    .config("spark.master", "local[*]")
    .getOrCreate();

Dataset<Row> lines = spark
    .readStream()
    .format("socket")
    .option("host", host)
    .option("port", port)
    .option("includeTimestamp", true)
    .load();

Dataset<Row> words = lines
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
    .toDF("word", "timestamp");

// 5 second window
Dataset<Row> fiveSecs = words
    .groupBy(
         functions.window(words.col("timestamp"), "5 seconds"),
         words.col("word")
    ).count().orderBy("window");

// 10 second window
Dataset<Row> tenSecs = words
    .groupBy(
          functions.window(words.col("timestamp"), "10 seconds"),
          words.col("word")
    ).count().orderBy("window");

Request a trigger stream request for aggregated streams 5 and 10. The output for stream 10s is not printed. Only 5 seconds are printed on the console.

// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
    .queryName("5_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
    .queryName("10_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

tenSecsQ.awaitTermination();
+6
source share
1 answer

I studied this question.

: Structured Streaming source. . , , , nc .

, , .


Spark. Databricks Shixiong Zhu :

Spark . , , , "nc -lk". netstat tcp, , . "nc" .

, : -, SimpleTCPWordServer, , . , :

val lines = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", "9999")
    .option("includeTimestamp", true)
    .load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("7 seconds"))
  .start()

StructuredStreaming , , . , , , .

:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+
|   value|          timestamp|
+--------+-------------------+
|champion|2017-08-14 13:54:51|
+--------+-------------------+

+------+-------------------+---+
| value|          timestamp|foo|
+------+-------------------+---+
|belong|2017-08-14 13:54:51|foo|
+------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------------+---+
|  value|          timestamp|foo|
+-------+-------------------+---+
| agenda|2017-08-14 13:54:52|foo|
|ceiling|2017-08-14 13:54:52|foo|
|   bear|2017-08-14 13:54:53|foo|
+-------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
|     value|          timestamp|
+----------+-------------------+
|    breath|2017-08-14 13:54:52|
|anticipate|2017-08-14 13:54:52|
|   amazing|2017-08-14 13:54:52|
|    bottle|2017-08-14 13:54:53|
| calculate|2017-08-14 13:54:53|
|     asset|2017-08-14 13:54:54|
|      cell|2017-08-14 13:54:54|
+----------+-------------------+

, . , , socket source, , backend .

+5

All Articles