Awful Spark SQL Current UDF support for Python UDF is a bit absent. I looked at trying to add some UDFs to Scala and call them from Python for the project I'm working on, so I did a quick proof of concept using an excess like UDAF for implementation. The branch currently lives at https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support
The first step is to define our UDAF in Scala - this is probably less than ideal, but here is the implementation:
object functions { def kurtosis(e: Column): Column = new Column(Kurtosis(EvilSqlTools.getExpr(e))) } case class Kurtosis(child: Expression) extends AggregateExpression { def this() = this(null) override def children = child :: Nil override def nullable: Boolean = true override def dataType: DataType = DoubleType override def toString: String = s"Kurtosis($child)" override def newInstance() = new KurtosisFunction(child, this) } case class KurtosisFunction(child: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) var data = scala.collection.mutable.ArrayBuffer.empty[Any] override def update(input: Row): Unit = { data += child.eval(input) }
Then we can use the same logic used in the implementation of Spark SQL functions.py:
"""Our magic extend functions. Here lies dragons and a sleepy holden.""" from py4j.java_collections import ListConverter from pyspark import SparkContext from pyspark.sql.dataframe import Column, _to_java_column __all__ = [] def _create_function(name, doc=""): """ Create a function for aggregator by name""" def _(col): sc = SparkContext._active_spark_context jc = getattr(sc._jvm.com.sparklingpandas.functions, name)(col._jc if isinstance(col, Column) else col) return Column(jc) _.__name__ = name _.__doc__ = doc return _ _functions = { 'kurtosis': 'Calculate the kurtosis, maybe!', } for _name, _doc in _functions.items(): globals()[_name] = _create_function(_name, _doc) del _name, _doc __all__ += _functions.keys() __all__.sort()
And then we can continue and name it as UDAF as follows:
from sparklingpandas.custom_functions import * import random input = range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) df1 = sqlContext.createDataFrame(sc.parallelize(input)\ .map(lambda i: Row(single=i, rand= random.randint(0,100000)))) df1.collect() import pyspark.sql.functions as F x = df1.groupBy(df1.single).agg(F.min(df1.rand)) x.collect() j = df1.groupBy(df1.single).agg(kurtosis(df1.rand)) j.collect()