I am collecting data from a messaging application, I currently use Flume, it sends about 50 million records per day
I want to use Kafka, it consumes from Kafka using Spark Streaming and save it in hadoop and request with impala
I have problems with every approach I tried.
Approach 1 - Save rdd as parquet, list the external hive parquet table in the parquet catalog
// scala val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt)) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) lines.foreachRDD(rdd => { // 1 - Create a SchemaRDD object from the rdd and specify the schema val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema) // 2 - register it as a spark sql table SchemaRDD1.registerTempTable("sparktable") // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files val finalParquet = sqlContext.sql(sql) finalParquet.saveAsParquetFile(dir)
The problem is that finalParquet. saveAparquetFile prints a huge number. files, Dstream, received from Kafka, produces over 200 files for a 1-minute batch size. The reason it outputs a lot of files is because the calculation is distributed as described in another post- how to make saveAsTextFile NOT split output to multiple files? requisition solutions do not seem optimal for me, for example. as claimed by one user. Having one output file is a good idea if you have very little data.
Approach 2 - Use a Hivecontext. insert rdd data directly into the hive table
# python sqlContext = HiveContext(sc) ssc = StreamingContext(sc, int(batch_interval)) kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1}) lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER) lines.foreachRDD(sendRecord) def sendRecord(rdd): sql = "INSERT INTO TABLE table select * from beacon_sparktable"
This works fine, it is inserted directly into the parquet table, but there are planning delays for batches, since the processing time exceeds the packet time interval. The consumer cannot keep up with what is being produced, and batches for processing begin to stand in line.
Recording to the hive seems to be slow. ive tried to adjust the intervla packet size by launching more consumer instances.
Finally
What is the best way to save big data from Spark Streaming, given that there are problems with multiple files and potential latency when recording to the hive? What are other people doing?
A similar question has been asked here, but it has a problem with directories, as this is due to too many files. How can I get Spark Streaming to write my output so that Impala can read it?
Thanks so much for any help