Why window functions do not work with "Window function X does not accept frame specification"?

I am trying to use Spark 1.4 window functions in pyspark 1.4.1

but in most cases errors or unexpected results occur. Here is a very simple example that I think should work:

from pyspark.sql.window import Window import pyspark.sql.functions as func l = [(1,101),(2,202),(3,303),(4,404),(5,505)] df = sqlContext.createDataFrame(l,["a","b"]) wSpec = Window.orderBy(df.a).rowsBetween(-1,1) df.select(df.a, func.rank().over(wSpec).alias("rank")) ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification. df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")) ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; wSpec = Window.orderBy(df.a) df.select(df.a, func.rank().over(wSpec).alias("rank")) ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected. df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect() [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)] 

As you can see, if I add the frame specification rowsBetween , not one of the window functions rank() and lag/lead() recognizes it: "The window function does not accept the frame specification."

If I omit the specification of the rowsBetween frame in leas lag/lead() , do not throw an exception, but return an unexpected result (for me): always None . And rank() still doesn't work with another exception.

Can someone help me get my window functions correctly?

UPDATE

Well, it starts to look like a pyspark bug. I prepared the same test in a clean spark (Scala, spark shell):

 import sqlContext.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505)) val rdd = sc.parallelize(l).map(i => Row(i._1,i._2)) val schemaString = "ab" val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true))) val df = sqlContext.createDataFrame(rdd, schema) import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val wSpec = Window.orderBy("a").rowsBetween(-1,1) df.select(df("a"), rank().over(wSpec).alias("rank")) ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.; df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; val wSpec = Window.orderBy("a") df.select(df("a"), rank().over(wSpec).alias("rank")).collect() ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5]) df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null]) 

Even if rowsBetween cannot be applied in Scala, both rank() and lag()/lead() work as I expect when rowsBetween omitted.

+6
source share
1 answer

As far as I can judge about two different problems. Window frame definitions are simply not supported by Hive GenericUDAFRank , GenericUDAFLag and GenericUDAFLead , so the errors you see are the expected behavior.

Regarding a problem with the following PySpark code

 wSpec = Window.orderBy(df.a) df.select(df.a, func.rank().over(wSpec).alias("rank")) 

this seems to be related to my question https://stackoverflow.com/q/31948194/1560062 and should be addressed to SPARK-9978 . For now, you can make it work by changing the window definition to the following:

 wSpec = Window.partitionBy().orderBy(df.a) 
+3
source

All Articles