Spark Build Custom Column Function, User Defined Function

I am using Scala and want to create my own DataFrame function. For example, I want to process a column like an array, iterate over each element and calculate.

To get started, I'm trying to implement my own getMax method. Thus, the column x will have the values ​​[3,8,2,5,9], and the expected output of the method will be 9.

Here's what it looks like in Scala

def getMax(inputArray: Array[Int]): Int = { var maxValue = inputArray(0) for (i <- 1 until inputArray.length if inputArray(i) > maxValue) { maxValue = inputArray(i) } maxValue } 

This is what I still have and get this error

 "value length is not a member of org.apache.spark.sql.column", 

and I don’t know how else to scroll the columns.

 def getMax(col: Column): Column = { var maxValue = col(0) for (i <- 1 until col.length if col(i) > maxValue){ maxValue = col(i) } maxValue 

}

As soon as I can implement my own method, I will create a column function

 val value_max:org.apache.spark.sql.Column=getMax(df.col("value")).as("value_max") 

And then I hope that I can use this in an SQL statement, for example

 val sample = sqlContext.sql("SELECT value_max(x) FROM table") 

and the expected result will be 9, given the input column [3,8,2,5,9]

I follow the answer from another Spark Scala thread - how I repeat the rows in the dataframe and add the calculated values ​​as new columns of the data frame , where they create a private standard deviation method. The calculations that I will do will be more complicated than this (for example, I will compare each element in a column), am I going in the right direction or should I look more at user-defined functions?

+7
scala apache-spark apache-spark-sql
source share
2 answers

In a Spark DataFrame, you cannot iterate over column elements using the approaches you were thinking about because the column is not an iterable.

However, to handle column values, you have some options, and the correct one depends on your task:

1) Using existing built-in functions

Spark SQL already has many useful column processing functions, including aggregation and transformation functions. You can find most of them in the functions package ( > here ). Thus, if you can use them, this is usually the best option. Note. Do not forget the window functions .

2) Creating UDF

If you cannot complete your task with built-in functions, you may consider defining UDF (User Defined Function). They are useful when you can process each element of a column yourself, and you expect to create a new column with the same number of rows as the original (non-aggregated column). This approach is quite simple: first, you define a simple function, then register it as UDF, then use it. Example:

 def myFunc: (String => String) = { s => s.toLowerCase } import org.apache.spark.sql.functions.udf val myUDF = udf(myFun) val newDF = df.withColumn("newCol", myUDF(df("oldCol"))) 

For more information here is a good article.

3) Using UDAF

If your task is to create aggregated data, you can define a UDAF (Custom Aggregate Function). I don't have much experience with this, but I can point you to a good tutorial:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) Return to RDD processing

If you really can’t use the above parameters or if your processing task depends on different lines to process it, and this is not aggregation, then I think you will need to select the desired column and process it using the appropriate RDD. Example:

 val singleColumnDF = df("column") val myRDD = singleColumnDF.rdd // process myRDD 

So there were options that I could think of. Hope this helps.

+15
source share

A simple example is provided in excellent documentation , where the entire section is devoted to UDF:

 import org.apache.spark.sql._ val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") val spark = df.sparkSession spark.udf.register("simpleUDF", (v: Int) => v * v) df.select($"id", callUDF("simpleUDF", $"value")) 
+1
source share

All Articles