How do I get Spark Streaming to write my output so that Impala can read it?

I have a problem with the Spark Streaming API. I am currently transferring input through Flume to Spark Streaming, with which I plan to do data preprocessing. Then I would like to save the data in the Hadoop file system and request it using Impala. However, Spark writes data files to separate directories and creates a new directory for each RDD.

This is a problem because, firstly, the external tables in Impala cannot find subdirectories, but only the files inside the directory to which they point, if they are not divided. Secondly, new directories are added so quickly by Spark that it would be very bad for performance to periodically create a new section in Impala for each generated directory. On the other hand, if I want to increase the interval of the roll of records in Spark, so that directories will be generated less often, an additional delay will be added until Impala can read the incoming data. This is unacceptable since my system must support real-time applications. In Hive, I could configure external tables to detect subdirectories without the need for partitioning using these settings:

set hive.mapred.supports.subdirectories=true; set mapred.input.dir.recursive=true; 

But for my understanding, Impala does not have such a function.

I am currently using the following code to read data from Flume and write it to HDFS:

 val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path) 

Here, the variable path defines the directory prefix to which text files are added (part-0000, etc.), and the rest of the directory name is the timestamp created by Spark. I could change the code to something like this:

 val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))) mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path)) 

In this case, the files will be added to the same directory defined by the path, but since they are always called part-00000, part-00001, part-00002, etc., previously created files will be overwritten. Studying the Spark source code, I noticed that file names are determined by a string in the SparkHadoopWriter open () method:

 val outputName = "part-" + numfmt.format(splitID) 

And it seems to me that there is no way to manipulate splitID through the Spark API. To summarize, I ask the following questions:

  • Is there a way to make external tables in Impala discovery subfolders?
  • If not, is there a way to get Spark to write its output files to a single directory or else to a form that Impala can read instantly?
  • If not, are there any updates expected from Spark to fix this problem, or should I just deploy my own version of Spark with which I can determine the names of the files that it writes itself?
+3
hadoop hive streaming apache-spark impala
source share
1 answer

I can not speak for Impala.

part-xxxxx is the contingent agreement that follows Spark. Most tools understand this format, and I would suggest that Spark cannot do this. Part files must be unique, and the section extension to the file name is a common method.

I would look in Impala to see how to read the part file, since most hasoop tools generate it this way.

If you want to customize the directory structure - although this is not your question - this can easily be achieved by, say, changing the format of prefix-timestamp-suffix . Spark Steaming uses Spark RDD.saveAsTextFiles(..) under the hood, which you can customize. Here is the code from DStream.scala:

  def saveAsTextFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc) } 
+1
source share

All Articles