How to select the last row, as well as how to access the PySpark data frame by index?

From a PySpark SQL data frame, for example

name age city abc 20 A def 30 B 

How to get the last row (for example, using df.limit (1) I can get the first row of data in a new data frame).

And how can I access dataframe rows by index.like no row. 12 or 200.

In pandas I can do

 df.tail(1) # for last row df.ix[rowno or index] # by index df.loc[] or by df.iloc[] 

I'm just curious how to access the pyspark data frame in these or alternative ways.

thanks

+8
source share
4 answers

How to get the last line.

A long and ugly way that assumes all columns are useful:

 from pyspark.sql.functions import ( col, max as max_, struct, monotonically_increasing_id ) last_row = (df .withColumn("_id", monotonically_increasing_id()) .select(max(struct("_id", *df.columns)) .alias("tmp")).select(col("tmp.*")) .drop("_id")) 

If not all columns can be ordered, you can try:

 with_id = df.withColumn("_id", monotonically_increasing_id()) i = with_id.select(max_("_id")).first()[0] with_id.where(col("_id") == i).drop("_id") 

Note. There is a last function in pyspark.sql.functions /`oassql.functions, but given the description of the corresponding expressions , this is not a good choice.

how can i access dataframe rows by index.like

You can not. Spark DataFrame and is available by index. You can add indexes using zipWithIndex and filter later. Just keep this operation in mind O (N).

+5
source

How to get the last line.

If you have a column that you can use to order a data framework, such as an “index”, then one simple way to get the last record is to use SQL: 1) order your table in descending order and 2) take the 1st value from this order

 df.createOrReplaceTempView("table_df") query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1""" latest_rec = self.sqlContext.sql(query_latest_rec) latest_rec.show() 

And how can I access dataframe rows by index.like row no. 12 or 200.

Similarly, you can get an entry on any line

 row_number = 12 df.createOrReplaceTempView("table_df") query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1""" latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number)) latest_rec.show() 

If you do not have an "index" column, you can create it with

 from pyspark.sql.functions import monotonically_increasing_id df = df.withColumn("index", monotonically_increasing_id()) 
+4
source
 from pyspark.sql import functions as F expr = [F.last(col).alias(col) for col in df.columns] df.agg(*expr) 

Just a hint: it looks like you still have a way of thinking about who works with pandas or R. Spark - this is another paradigm in how we work with data. You no longer access the data inside individual cells, now you are working with whole parts of it. If you continue to pack and do things like you just did, you lose the whole concept of concurrency that the spark provides. Take a look at the concept of transformation versus action in Spark.

+3
source

Use the following to get an index column containing monotonically increasing, unique, and consecutive integers that don't , like monotonically_increasing_id() work. Indexes will increase in the same order as the colName your DataFrame.

 import pyspark.sql.functions as F from pyspark.sql.window import Window as W window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow) df = df\ .withColumn('int', F.lit(1))\ .withColumn('index', F.sum('int').over(window))\ .drop('int')\ 

Use the following code to look at the tail or the last rownums in a DataFrame.

 rownums = 10 df.where(F.col('index')>df.count()-rownums).show() 

Use the following code to view rows from start_row to end_row DataFrame.

 start_row = 20 end_row = start_row + 10 df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show() 

zipWithIndex() is an RDD method that returns monotonically increasing, unique, and sequential integers, but seems to be much slower to implement in such a way that you can return to the original DataFrame corrected with the id column.

0
source

All Articles