How to group several keys into a spark?

I have a set of tuples that consist of compound keys and values. For example,

tfile.collect() = [(('id1','pd1','t1'),5.0), (('id2','pd2','t2'),6.0), (('id1','pd1','t2'),7.5), (('id1','pd1','t3'),8.1) ] 

I want to perform sql operations in this collection, where I can aggregate information based on id [1..n] or pd [1..n]. I want to implement using apis vanys pyspark and not use SQLContext. In my current implementation, I am reading from a bunch of files and merging RDD.

 def readfile(): fr = range(6,23) tfile = sc.union([sc.textFile(basepath+str(f)+".txt") .map(lambda view: set_feature(view,f)) .reduceByKey(lambda a, b: a+b) for f in fr]) return tfile 

I intend to create an aggregated array as a value. For example,

 agg_tfile = [((id1,pd1),[5.0,7.5,8.1])] 

where 5.0.7.5.8.1 represent [t1, t2, t3]. I'm currently getting the same vanilla python code using dictionaries. It works great for small data sets. But I'm worried, as this may not scale for large datasets. Is there an effective way to achieve the same use of apis pyspark?

+7
python apache-spark pyspark
source share
2 answers

I assume that you want to transfer data according to several fields.

An easy way is to concatenate the target fields that you will group and make them the key in a paired RDD. For example:

 lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1']) rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b) print rdd.collect() 

Then you will get the transferred result.

 [('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')] 
+13
source share

I grouped ((id1, t1), ((p1,5.0), (p2,6.0)), etc .... as my display function. Later I reduce the use of map_group, which creates an array for [p1, p2 , ...] and fills in the values ​​in their respective provisions.

 def map_group(pgroup): x = np.zeros(19) x[0] = 1 value_list = pgroup[1] for val in value_list: fno = val[0].split('.')[0] x[int(fno)-5] = val[1] return x tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \ .reduceByKey(lambda p,q:p+q) \ .map(lambda d: (d[0], map_group(d))) 

This is a really expensive solution in terms of computing. But it works for now.

+2
source share

All Articles