Scala and Spark UDF Feature

I made a simple UDF to convert or extract some values ​​from a time field in temptabl in spark. I am registering a function, but when I call the function using sql, it throws a NullPointerException. Below is my function and its execution process. I am using Zeppelin. Strange it worked yesterday, but this morning it stopped working.

Function

def convert( time:String ) : String = { val sdf = new java.text.SimpleDateFormat("HH:mm") val time1 = sdf.parse(time) return sdf.format(time1) } 

Register function

 sqlContext.udf.register("convert",convert _) 

Check function without SQL - it works

 convert(12:12:12) -> returns 12:12 

Test the function with SQL in Zeppelin on this FAILS.

 %sql select convert(time) from temptable limit 10 

Temptation structure

 root |-- date: string (nullable = true) |-- time: string (nullable = true) |-- serverip: string (nullable = true) |-- request: string (nullable = true) |-- resource: string (nullable = true) |-- protocol: integer (nullable = true) |-- sourceip: string (nullable = true) 

The part from stacktrace that I get.

 java.lang.NullPointerException at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652) at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54) at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376) at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44) 
+6
source share
1 answer

Use udf instead of directly defining a function

 import org.apache.spark.sql.functions._ val convert = udf[String, String](time => { val sdf = new java.text.SimpleDateFormat("HH:mm") val time1 = sdf.parse(time) sdf.format(time1) } ) 

The udf input parameter is the column (or columns). And the return type is a column.

 case class UserDefinedFunction protected[sql] ( f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]) { def apply(exprs: Column*): Column = { Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil))) } } 
+8
source

All Articles