Group data block by date

I loaded a DataFrame from a SQLServer table. It looks like this:

>>> df.show() +--------------------+----------+ | timestamp| Value | +--------------------+----------+ |2015-12-02 00:10:...| 652.8| |2015-12-02 00:20:...| 518.4| |2015-12-02 00:30:...| 524.6| |2015-12-02 00:40:...| 382.9| |2015-12-02 00:50:...| 461.6| |2015-12-02 01:00:...| 476.6| |2015-12-02 01:10:...| 472.6| |2015-12-02 01:20:...| 353.0| |2015-12-02 01:30:...| 407.9| |2015-12-02 01:40:...| 475.9| |2015-12-02 01:50:...| 513.2| |2015-12-02 02:00:...| 569.0| |2015-12-02 02:10:...| 711.4| |2015-12-02 02:20:...| 457.6| |2015-12-02 02:30:...| 392.0| |2015-12-02 02:40:...| 459.5| |2015-12-02 02:50:...| 560.2| |2015-12-02 03:00:...| 252.9| |2015-12-02 03:10:...| 228.7| |2015-12-02 03:20:...| 312.2| +--------------------+----------+ 

Now I would like to group (and summarize) the values ​​by the hour (or day, or month or ...), but in fact I do not know how to do this.

The way I load a DataFrame. I have the feeling that this is the wrong way to do this:

 query = """ SELECT column1 AS timestamp, column2 AS value FROM table WHERE blahblah """ sc = SparkContext("local", 'test') sqlctx = SQLContext(sc) df = sqlctx.load(source="jdbc", url="jdbc:sqlserver://<CONNECTION_DATA>", dbtable="(%s) AS alias" % query) 

This is normal?

+7
python apache-spark pyspark apache-spark-sql
source share
2 answers

Since 1.5.0 Spark provides a number of functions, such as dayofmonth , hour , month or year , which can work with dates and timestamps. Therefore, if timestamp is a TimestampType , all you need is the correct expression. For example:

 from pyspark.sql.functions import hour, mean (df .groupBy(hour("timestamp").alias("hour")) .agg(mean("value").alias("mean")) .show()) ## +----+------------------+ ## |hour| mean| ## +----+------------------+ ## | 0|508.05999999999995| ## | 1| 449.8666666666666| ## | 2| 524.9499999999999| ## | 3|264.59999999999997| ## +----+------------------+ 

Pre-1.5.0, your best option is to use HiveContext and UUF Hive with selectExpr :

 df.selectExpr("year(timestamp) AS year", "value").groupBy("year").sum() ## +----+---------+----------+ ## |year|SUM(year)|SUM(value)| ## +----+---------+----------+ ## |2015| 40300| 9183.0| ## +----+---------+----------+ 

or raw SQL:

 df.registerTempTable("df") sqlContext.sql(""" SELECT MONTH(timestamp) AS month, SUM(value) AS values_sum FROM df GROUP BY MONTH(timestamp)""") 

Just remember that the aggregation performed by Spark is not discarded to an external source. This is usually the desired behavior, but there are situations where you may prefer to perform aggregation as a subquery to restrict data transfer.

+11
source share

Alternatively, you can use date_format to create any time period that you want. Group day:

 from pyspark.sql import functions as F df.select(F.date_format('timestamp','yyyy-MM-dd').alias('day')).groupby('day').count().show() 

Group month (just change the format):

 df.select(F.date_format('timestamp','yyyy-MM').alias('month')).groupby('month').count().show() 
0
source share

All Articles