Pyspark: moving average using timeseries data

I have a dataset consisting of a timestamp column and a dollar column. I would like to find the average number of dollars per week ending at the timestamp of each line. At first I looked at the pyspark.sql.functions.window function, but this forces the data for the week.

Here is an example:

%pyspark
import datetime
from pyspark.sql import functions as F

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()

This results in two entries:

|        start        |          end         | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|

A window function binds time series data, rather than performing a moving average.

Is there a way to execute a moving average when I get back to the weekly average for each row with a period of time ending in timestampGMT row?

EDIT:

Zhang's answer below is close to what I want, but not quite what I would like to see.

Here is the best example to show what I'm trying to get to:

%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))

This results in the following file frame:

dollars timestampGMT            rolling_average
25      2017-03-18 11:27:18.0   25
17      2017-03-10 15:27:18.0   15
13      2017-03-15 12:27:18.0   15

, , timestampGMT, :

dollars timestampGMT            rolling_average
17      2017-03-10 15:27:18.0   17
13      2017-03-15 12:27:18.0   15
25      2017-03-18 11:27:18.0   19

roll_average 2017-03-10 17, . roll_average 2017-03-15 15, 13 2017-03-15 17 2017-03-10, 7- . 2017-03-18 19, 25 2017-03-18 13 2017-03-10, 7- , 17 2017 -03-10, 7- .

, binning, ?

+14
4

, / , :

Spark Window -

, , rangeBetween pyspark.sql.Window, .

:

%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))

, :

dollars   timestampGMT            rolling_average
17        2017-03-10 15:27:18.0   17.0
13        2017-03-15 12:27:18.0   15.0
25        2017-03-18 11:27:18.0   19.0
+21

, . , - :

, :

:

from pyspark.sql.window import Window
from pyspark.sql import functions as func


df = spark.createDataFrame([("tshilidzi", 17.00, "2018-03-10T15:27:18+00:00"), 
  ("tshilidzi", 13.00, "2018-03-11T12:27:18+00:00"),   
  ("tshilidzi", 25.00, "2018-03-12T11:27:18+00:00"), 
  ("thabo", 20.00, "2018-03-13T15:27:18+00:00"), 
  ("thabo", 56.00, "2018-03-14T12:27:18+00:00"), 
  ("thabo", 99.00, "2018-03-15T11:27:18+00:00"), 
  ("tshilidzi", 156.00, "2019-03-22T11:27:18+00:00"), 
  ("thabo", 122.00, "2018-03-31T11:27:18+00:00"), 
  ("tshilidzi", 7000.00, "2019-04-15T11:27:18+00:00"),
  ("ash", 9999.00, "2018-04-16T11:27:18+00:00") 
  ],
  ["name", "dollars", "timestampGMT"])

# we need this timestampGMT as seconds for our Window time frame
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

df.show(10000, False)

:

+---------+-------+---------------------+
|name     |dollars|timestampGMT         |
+---------+-------+---------------------+
|tshilidzi|17.0   |2018-03-10 17:27:18.0|
|tshilidzi|13.0   |2018-03-11 14:27:18.0|
|tshilidzi|25.0   |2018-03-12 13:27:18.0|
|thabo    |20.0   |2018-03-13 17:27:18.0|
|thabo    |56.0   |2018-03-14 14:27:18.0|
|thabo    |99.0   |2018-03-15 13:27:18.0|
|tshilidzi|156.0  |2019-03-22 13:27:18.0|
|thabo    |122.0  |2018-03-31 13:27:18.0|
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|
|ash      |9999.0 |2018-04-16 13:27:18.0|
+---------+-------+---------------------+

name :

#create window by casting timestamp to long (number of seconds)
w = (Window()
     .partitionBy(col("name"))
     .orderBy(F.col("timestampGMT").cast('long'))
     .rangeBetween(-days(7), 0))

df2 = df.withColumn('rolling_average', F.avg("dollars").over(w))

df2.show(100, False)

:

+---------+-------+---------------------+------------------+
|name     |dollars|timestampGMT         |rolling_average   |
+---------+-------+---------------------+------------------+
|ash      |9999.0 |2018-04-16 13:27:18.0|9999.0            |
|tshilidzi|17.0   |2018-03-10 17:27:18.0|17.0              |
|tshilidzi|13.0   |2018-03-11 14:27:18.0|15.0              |
|tshilidzi|25.0   |2018-03-12 13:27:18.0|18.333333333333332|
|tshilidzi|156.0  |2019-03-22 13:27:18.0|156.0             |
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|7000.0            |
|thabo    |20.0   |2018-03-13 17:27:18.0|20.0              |
|thabo    |56.0   |2018-03-14 14:27:18.0|38.0              |
|thabo    |99.0   |2018-03-15 13:27:18.0|58.333333333333336|
|thabo    |122.0  |2018-03-31 13:27:18.0|122.0             |
+---------+-------+---------------------+------------------+
+2

:

df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"),
                            (13, "2017-03-11T12:27:18+00:00"),
                            (21, "2017-03-17T11:27:18+00:00")],
                           ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days"))))

:

+-------+-------------------+---------------+                                   
|dollars|timestampGMT       |rolling_average|
+-------+-------------------+---------------+
|21     |2017-03-17 19:27:18|21.0           |
|17     |2017-03-11 23:27:18|15.0           |
|13     |2017-03-11 20:27:18|15.0           |
+-------+-------------------+---------------+
+1
source

It is worth noting that if you do not care about exact dates, but want to get the average value for the last 30 days, you can use the rowBetween function as follows:

w = Window.orderBy('timestampGMT').rowsBetween(-7, 0)

df = eurPrices.withColumn('rolling_average', F.avg('dollars').over(w))

Since you order by date, it will take the last 7 cases. You save all the castings.

0
source

All Articles