Spark Streaming Permanent Exit

I am collecting data from a messaging application, I currently use Flume, it sends about 50 million records per day

I want to use Kafka, it consumes from Kafka using Spark Streaming and save it in hadoop and request with impala

I have problems with every approach I tried.

Approach 1 - Save rdd as parquet, list the external hive parquet table in the parquet catalog

// scala val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt)) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) lines.foreachRDD(rdd => { // 1 - Create a SchemaRDD object from the rdd and specify the schema val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema) // 2 - register it as a spark sql table SchemaRDD1.registerTempTable("sparktable") // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files val finalParquet = sqlContext.sql(sql) finalParquet.saveAsParquetFile(dir) 

The problem is that finalParquet. saveAparquetFile prints a huge number. files, Dstream, received from Kafka, produces over 200 files for a 1-minute batch size. The reason it outputs a lot of files is because the calculation is distributed as described in another post- how to make saveAsTextFile NOT split output to multiple files? requisition solutions do not seem optimal for me, for example. as claimed by one user. Having one output file is a good idea if you have very little data.

Approach 2 - Use a Hivecontext. insert rdd data directly into the hive table

 # python sqlContext = HiveContext(sc) ssc = StreamingContext(sc, int(batch_interval)) kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1}) lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER) lines.foreachRDD(sendRecord) def sendRecord(rdd): sql = "INSERT INTO TABLE table select * from beacon_sparktable" # 1 - Apply the schema to the RDD creating a data frame 'beaconDF' beaconDF = sqlContext.jsonRDD(rdd,schema) # 2- Register the DataFrame as a spark sql table. beaconDF.registerTempTable("beacon_sparktable") # 3 - insert to hive directly from a qry on the spark sql table sqlContext.sql(sql); 

This works fine, it is inserted directly into the parquet table, but there are planning delays for batches, since the processing time exceeds the packet time interval. The consumer cannot keep up with what is being produced, and batches for processing begin to stand in line.

Recording to the hive seems to be slow. ive tried to adjust the intervla packet size by launching more consumer instances.

Finally

What is the best way to save big data from Spark Streaming, given that there are problems with multiple files and potential latency when recording to the hive? What are other people doing?

A similar question has been asked here, but it has a problem with directories, as this is due to too many files. How can I get Spark Streaming to write my output so that Impala can read it?

Thanks so much for any help

+7
hadoop apache-kafka spark-streaming
source share
1 answer

In solution # 2, the number of generated files can be controlled by the number of partitions of each RDD.

See this example:

 // create a Hive table (assume it already existing) sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET") // create a RDD with 2 records and only 1 partition val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1) // create a DataFrame from the RDD val schema = StructType(Seq( StructField("id", IntegerType, nullable = false), StructField("txt", StringType, nullable = false) )) val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema) // this creates a single file, because the RDD has 1 partition df.write.mode("append").saveAsTable("test") 

Now, I think, you can play with the frequency with which you extract data from Kafka and the number of sections of each RDD (by default, sections of your Kafka theme, which you can reduce by redistributing).

I am using Spark 1.5 from CDH 5.5.1, and I get the same result using df.write.mode("append").saveAsTable("test") or your SQL string.

0
source share

All Articles