I am trying to use the Spark Streaming app in Java. The Spark app reads a continuous feed from Hadoop using textFileStream () at 1-minute intervals. I need to perform a Spark (group by) aggregation operation on an incoming DStream. After aggregation, I connect the aggregated DStream<Key, Value1> with RDD<Key, Value2> with RDD<Key, Value2> , created from a static dataset read by textFile () from the hadoop directory.
The problem occurs when I turn on the breakpoint. With an empty checkpoint directory, it works fine. After starting 2-3 packages, I close it with ctrl + c and start it again. The second time it starts, it immediately throws a spark exception: "SPARK-5063"
Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063
The following is a spark application code block:
private void compute(JavaSparkContext sc, JavaStreamingContext ssc) { JavaRDD<String> distFile = sc.textFile(MasterFile); JavaDStream<String> file = ssc.textFileStream(inputDir); // Read Master file JavaRDD<MasterParseLog> masterLogLines = distFile.flatMap(EXTRACT_MASTER_LOGLINES); final JavaPairRDD<String, String> masterRDD = masterLogLines.mapToPair(MASTER_KEY_VALUE_MAPPER); // Continuous Streaming file JavaDStream<ParseLog> logLines = file.flatMap(EXTRACT_CKT_LOGLINES); // calculate the sum of required field and generate group sum RDD JavaPairDStream<String, Summary> sumRDD = logLines.mapToPair(CKT_GRP_MAPPER); JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKey(CKT_GRP_SUM); //GROUP BY Operation JavaPairDStream<String, Summary> grpAvgRDD = grpSumRDD.mapToPair(CKT_GRP_AVG); // Join Master RDD with the DStream //This is the block causing error (without it code is working fine) JavaPairDStream<String, Tuple2<String, String>> joinedStream = grpAvgRDD.transformToPair( new Function2<JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() { private static final long serialVersionUID = 1L; public JavaPairRDD<String, Tuple2<String, String>> call( JavaPairRDD<String, String> rdd, Time v2) throws Exception { return masterRDD.value().join(rdd); } } ); joinedStream.print(10); } public static void main(String[] args) { JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { // Create the context with a 60 second batch size SparkConf sparkConf = new SparkConf(); final JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaStreamingContext ssc1 = new JavaStreamingContext(sc, Durations.seconds(duration)); app.compute(sc, ssc1); ssc1.checkpoint(checkPointDir); return ssc1; } }; JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, contextFactory); // start the streaming server ssc.start(); logger.info("Streaming server started..."); // wait for the computations to finish ssc.awaitTermination(); logger.info("Streaming server stopped..."); }
I know that the code block that connects the static dataset to the DStream causes an error, but it is taken from the spark stream page of the Apache spark site (subheading "join data streams" in the " Join operations " section). Please help me make it work , even if thereβs another way to do this, I need to enable the breakpoint in my streaming application.
Environmental Information:
- Centos6.5: 2 node Cluster
- Java: 1.8
- Spark: 1.4.1
- Hadoop: 2.7.1 *