DataSpark Aggregation Functions for PySpark with SciPy

I tried several different scripts to try using Spark 1.3 DataFrames to handle things like sciPy kurtosis or numpy std. Here is a sample code, but it just hangs on a 10x10 dataset (10 rows with 10 columns). I tried:

print df.groupBy().agg(kurtosis(df.offer_id)).collect() print df.agg(kurtosis(df.offer_ID)).collect() 

But it's not a problem:

 print df.agg(F.min(df.offer_id), F.min(df.decision_id)).collect() 

My assumption is that F: from pyspark.sql import functions as F is a programmed sql function. How can I use dataframes to do things like kurtosis in a dataset?

It also just hangs:

 print df.map(kurtosis(df.offer_id)).collect() 
+5
source share
1 answer

Awful Spark SQL Current UDF support for Python UDF is a bit absent. I looked at trying to add some UDFs to Scala and call them from Python for the project I'm working on, so I did a quick proof of concept using an excess like UDAF for implementation. The branch currently lives at https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support

The first step is to define our UDAF in Scala - this is probably less than ideal, but here is the implementation:

 object functions { def kurtosis(e: Column): Column = new Column(Kurtosis(EvilSqlTools.getExpr(e))) } case class Kurtosis(child: Expression) extends AggregateExpression { def this() = this(null) override def children = child :: Nil override def nullable: Boolean = true override def dataType: DataType = DoubleType override def toString: String = s"Kurtosis($child)" override def newInstance() = new KurtosisFunction(child, this) } case class KurtosisFunction(child: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) var data = scala.collection.mutable.ArrayBuffer.empty[Any] override def update(input: Row): Unit = { data += child.eval(input) } // This function seems shaaady // TODO: Do something more reasonable private def toDouble(x: Any): Double = { x match { case x: NumericType => EvilSqlTools.toDouble(x.asInstanceOf[NumericType]) case x: Long => x.toDouble case x: Int => x.toDouble case x: Double => x } } override def eval(input: Row): Any = { if (data.isEmpty) { println("No data???") null } else { val inputAsDoubles = data.toList.map(toDouble) println("computing on input "+inputAsDoubles) val inputArray = inputAsDoubles.toArray val apacheKurtosis = new ApacheKurtosis() val result = apacheKurtosis.evaluate(inputArray, 0, inputArray.size) println("result "+result) Cast(Literal(result), DoubleType).eval(null) } } } 

Then we can use the same logic used in the implementation of Spark SQL functions.py:

 """Our magic extend functions. Here lies dragons and a sleepy holden.""" from py4j.java_collections import ListConverter from pyspark import SparkContext from pyspark.sql.dataframe import Column, _to_java_column __all__ = [] def _create_function(name, doc=""): """ Create a function for aggregator by name""" def _(col): sc = SparkContext._active_spark_context jc = getattr(sc._jvm.com.sparklingpandas.functions, name)(col._jc if isinstance(col, Column) else col) return Column(jc) _.__name__ = name _.__doc__ = doc return _ _functions = { 'kurtosis': 'Calculate the kurtosis, maybe!', } for _name, _doc in _functions.items(): globals()[_name] = _create_function(_name, _doc) del _name, _doc __all__ += _functions.keys() __all__.sort() 

And then we can continue and name it as UDAF as follows:

 from sparklingpandas.custom_functions import * import random input = range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) df1 = sqlContext.createDataFrame(sc.parallelize(input)\ .map(lambda i: Row(single=i, rand= random.randint(0,100000)))) df1.collect() import pyspark.sql.functions as F x = df1.groupBy(df1.single).agg(F.min(df1.rand)) x.collect() j = df1.groupBy(df1.single).agg(kurtosis(df1.rand)) j.collect() 
+2
source

All Articles