Update. It seems that my errors are probably related to the way I installed Spark and / or Hive. Working with window features seems pretty straightforward in a Databricks laptop (hosting). I need to figure out how to install this locally.
I have a Spark DataFrame in which I have to use the Window function. * I tried to follow the instructions here , but I had some problems.
Setting up my environment:
import os import sys import datetime as dt os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2' os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip' sys.path.append('/usr/bin/spark-1.5.2/python') sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip') import pyspark sc = pyspark.SparkContext() hiveContext = pyspark.sql.HiveContext(sc) sqlContext = pyspark.sql.SQLContext(sc) from pyspark.sql import Row from pyspark.sql.functions import struct from pyspark.sql import DataFrame from collections import OrderedDict
Setting my data:
test_ts = {'adminDistrict': None, 'city': None, 'country': {'code': 'NA', 'name': 'UNKNOWN'}, 'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89}, {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44}, {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3}, {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6}, {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84}, {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74}, {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33}, {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33}, {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5}, {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79}, {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3}, {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0}, {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35}, {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82}, {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24}, {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61}, {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14}, {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0}, {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82}, {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11}, {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46}, {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8}, {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74}, {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63}, {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64}, {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}], 'maxDate': '2015-12-28T00:00:00Z', 'minDate': '2005-08-25T00:00:00Z', 'name': 'S&P GSCI Crude Oil Spot', 'offset': 0, 'resolution': 'DAY', 'sources': ['trf'], 'subtype': 'Index', 'type': 'Commodities', 'uid': 'TRF_INDEX_Z39824_PI'}
Function to turn json into a DataFrame:
def ts_to_df(ts): data = [] for line in ts['data']: data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value'])) return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])
Getting a data frame and looking at what's inside:
test_df = ts_to_df(test_ts) test_df.show()
This shows me the following:
+----------+----------------------+ | Date|SP_GSCI_Crude_Oil_Spot| +----------+----------------------+ |2005-08-25| 369.89| |2005-08-26| 362.44| |2005-08-29| 368.3| |2005-08-30| 382.6| |2005-08-31| 377.84| |2005-09-01| 380.74| |2005-09-02| 370.33| |2005-09-05| 370.33| |2005-09-06| 361.5| |2005-09-07| 352.79| |2005-09-08| 354.3| |2005-09-09| 353.0| |2005-09-12| 349.35| |2005-09-13| 348.82| |2005-09-14| 360.24| |2005-09-15| 357.61| |2005-09-16| 347.14| |2005-09-19| 370.0| |2005-09-20| 362.82| |2005-09-21| 366.11| +----------+----------------------+
And this is where I have no idea what I'm doing, and everything starts to go wrong:
from pyspark.sql.functions import lag, col, lead from pyspark.sql.window import Window w = Window().partitionBy().orderBy(col('Date')) test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
This gives me this error:
Py4JJavaError: An error occurred while calling o59.select .: org.apache.spark.sql.AnalysisException: Failed to resolve the lead window function. Note that when using window functions, a HiveContext is currently required;
So it looks like I need a HiveContext, right? Do I need to create my DataFrame using a HiveContext? Then let me try to create a DataFrame explicitly using a HiveContext:
def ts_to_hive_df(ts): data = [] for line in ts['data']: data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), ts['name'].replace('&', '').replace(' ', '_'):line['value']}) temp_rdd = sc.parallelize(data).map(lambda x: Row(**x)) return hiveContext.createDataFrame(temp_rdd) test_df = ts_to_hive_df(test_ts) test_df.show()
But this gives me this error:
TypeError: JavaPackage object cannot be called
So how do I use Window functions? Do I need to create DataFrames using a HiveContext? If so, how to do it? Can someone tell me what I'm doing wrong?
* I need to know if there are gaps in my data. I have a βDateβ column, and for each row sorted by date, I want to know what is in the next row, and if I have no missing days or bad data, then I want to use the data of the last day on this row. If you know the best way to do this, let me know. But I still would like to know how to make these window functions work.