Reading and writing from hive tables with spark light after aggregation

We have a hive warehouse, and he wanted to use a spark for various tasks (mainly for classification). From time to time, return results in the form of a beehive table. For example, we wrote the following python function to find the total amount of the original_table column, divided by the original_table column. The function works, but we are worried that it is inefficient, especially cards for conversion to key-value pairs and dictionary versions. The functions combiner, mergeValue, mergeCombiner are defined elsewhere, but they work fine.

from pyspark import HiveContext rdd = HiveContext(sc).sql('from original_table select *') #convert to key-value pairs key_value_rdd = rdd.map(lambda x: (x[0], int(x[1]))) #create rdd where rows are (key, (sum, count) combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner) # creates rdd with dictionary values in order to create schemardd dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]}) # infer the schema schema_rdd = HiveContext(sc).inferSchema(dict_rdd) # save schema_rdd.saveAsTable('new_table_name') 

Are there more efficient ways to do the same?

+7
source share
3 answers

... perhaps this was not possible when the question was written, but it makes no sense now (message 1.3) to use the createDataFrame () call?

After receiving your first RDD, it looks like you can make a call and then run a simple SQL statement on the structure to do all the work in one go. (Sum and grouping) In addition, the DataFrame structure can output the schema directly during creation if I read the API document correctly.

( http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext )

0
source

This error can be resolved by installing hive.exec.scratchdir in the folder in which the user has access

0
source

What version of spark are you using?

This answer is based on version 1.6 and uses data frames.

 val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt") import org.apache.spark.sql.functions._ client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show() +-----+---+-----+ |Categ|Sum|count| +-----+---+-----+ | A| 15| 2| | B| 56| 1| +-----+---+-----+ 

Hope this helps!

0
source

Source: https://habr.com/ru/post/1212974/


All Articles