I am trying to use Spark 1.4 window functions in pyspark 1.4.1
but in most cases errors or unexpected results occur. Here is a very simple example that I think should work:
from pyspark.sql.window import Window import pyspark.sql.functions as func l = [(1,101),(2,202),(3,303),(4,404),(5,505)] df = sqlContext.createDataFrame(l,["a","b"]) wSpec = Window.orderBy(df.a).rowsBetween(-1,1) df.select(df.a, func.rank().over(wSpec).alias("rank")) ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification. df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")) ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; wSpec = Window.orderBy(df.a) df.select(df.a, func.rank().over(wSpec).alias("rank")) ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected. df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect() [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]
As you can see, if I add the frame specification rowsBetween , not one of the window functions rank() and lag/lead() recognizes it: "The window function does not accept the frame specification."
If I omit the specification of the rowsBetween frame in leas lag/lead() , do not throw an exception, but return an unexpected result (for me): always None . And rank() still doesn't work with another exception.
Can someone help me get my window functions correctly?
UPDATE
Well, it starts to look like a pyspark bug. I prepared the same test in a clean spark (Scala, spark shell):
import sqlContext.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505)) val rdd = sc.parallelize(l).map(i => Row(i._1,i._2)) val schemaString = "ab" val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true))) val df = sqlContext.createDataFrame(rdd, schema) import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val wSpec = Window.orderBy("a").rowsBetween(-1,1) df.select(df("a"), rank().over(wSpec).alias("rank")) ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.; df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; val wSpec = Window.orderBy("a") df.select(df("a"), rank().over(wSpec).alias("rank")).collect() ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5]) df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])
Even if rowsBetween cannot be applied in Scala, both rank() and lag()/lead() work as I expect when rowsBetween omitted.