Spark: zipwithindex equivalent in dataframe

Assuming I have the following data frame:

dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)] df = sc.parallelize(dummy_data).toDF(['letter','number']) 

And I want to create the following data framework:

 [('a',0),('b',2),('c',1),('d',3),('e',0)] 

What I do is convert it to rdd and use the zipWithIndex function zipWithIndex and after attaching to the results:

 convertDF = (df.select('number') .distinct() .rdd .zipWithIndex() .map(lambda x:(x[0].number,x[1])) .toDF(['old','new'])) finalDF = (df .join(convertDF,df.number == convertDF.old) .select(df.letter,convertDF.new)) 

Is there something like the zipWithIndex function in dataframes? Is there an even more efficient way to accomplish this task?

+5
source share
1 answer

Please check https://issues.apache.org/jira/browse/SPARK-23074 for this direct functional parity in dataframes .. upvote that jira if you are interested to see this at some point in sparks.

Here is a workaround though in PySpark:

 def dfZipWithIndex (df, offset=1, colName="rowId"): ''' Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe and preserves a schema :param df: source dataframe :param offset: adjustment to zipWithIndex() index :param colName: name of the index column ''' new_schema = StructType( [StructField(colName,LongType(),True)] # new added field in front + df.schema.fields # previous schema ) zipped_rdd = df.rdd.zipWithIndex() new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) return spark.createDataFrame(new_rdd, new_schema) 

It is also available in the abalon package.

0
source

All Articles