Spark SQL: how to call UDF from a DataFrame operation using JAVA

I would like to know how to call a UDF function from a domain specific language (DSL) function in Spark SQL using JAVA.

I have a UDF function (e.g.):

UDF2 equals = new UDF2<String, String, Boolean>() { @Override public Boolean call(String first, String second) throws Exception { return first.equals(second); } }; 

I registered it in sqlContext

 sqlContext.udf().register("equals", equals, DataTypes.BooleanType); 

When I run the following query, my UDF is called and I get the result.

 sqlContext.sql("SELECT p0.value FROM values p0 WHERE equals(p0.value, 'someString')"); 

I would forward this query using the functions of a domain-specific language in Spark SQL, and I'm not sure how to do this.

 valuesDF.select("value").where(???); 

I found that there is a callUDF () function, where one of its parameters is Function2 fnctn, but not UDF2. How can I use UDF and functions from DSL?

+2
source share
3 answers

I found a solution that I'm half satisfied with. UDF can be called as a column condition, for example:

 valuesDF.filter("equals(columnName, 'someString')").select("columnName"); 

But I'm still wondering if you can directly call UDF.


Edit:

Btw, you can call udf directly, for example:

 df.where(callUdf("equals", scala.collection.JavaConversions.asScalaBuffer( Arrays.asList(col("columnName"), col("otherColumnName")) ).seq())).select("columnName"); 

import of org functions required. apache spark. sql.

+4
source

When querying a data frame, you should simply execute UDF using something like this:

 sourceDf.filter(equals(col("columnName"), "someString")).select("columnName") 

where col ("columnName") is the column you want to compare.

0
source

Here is an example of working code. It works with Spark 1.5.x and 1.6.x. The trick of calling UDF from a pipeline transformer is to use sqlContext () in a DataFrame to register your UDF

 @Test public void test() { // https://issues.apache.org/jira/browse/SPARK-12484 logger.info("BEGIN"); DataFrame df = createData(); final String tableName = "myTable"; sqlContext.registerDataFrameAsTable(df, tableName); logger.info("print schema"); df.printSchema(); logger.info("original data before we applied UDF"); df.show(); MyUDF udf = new MyUDF(); final String udfName = "myUDF"; sqlContext.udf().register(udfName, udf, DataTypes.StringType); String fmt = "SELECT *, %s(%s) as transformedByUDF FROM %s"; String stmt = String.format(fmt, udfName, tableName+".labelStr", tableName); logger.info("AEDWIP stmt:{}", stmt); DataFrame udfDF = sqlContext.sql(stmt); Row[] results = udfDF.head(3); for (Row row : results) { logger.info("row returned by applying UDF {}", row); } logger.info("AEDWIP udfDF schema"); udfDF.printSchema(); logger.info("AEDWIP udfDF data"); udfDF.show(); logger.info("END"); } DataFrame createData() { Features f1 = new Features(1, category1); Features f2 = new Features(2, category2); ArrayList<Features> data = new ArrayList<Features>(2); data.add(f1); data.add(f2); //JavaRDD<Features> rdd = javaSparkContext.parallelize(Arrays.asList(f1, f2)); JavaRDD<Features> rdd = javaSparkContext.parallelize(data); DataFrame df = sqlContext.createDataFrame(rdd, Features.class); return df; } class MyUDF implements UDF1<String, String> { private static final long serialVersionUID = 1L; @Override public String call(String s) throws Exception { logger.info("AEDWIP s:{}", s); String ret = s.equalsIgnoreCase(category1) ? category1 : category3; return ret; } } public class Features implements Serializable{ private static final long serialVersionUID = 1L; int id; String labelStr; Features(int id, String l) { this.id = id; this.labelStr = l; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getLabelStr() { return labelStr; } public void setLabelStr(String labelStr) { this.labelStr = labelStr; } } this is the output +---+--------+ | id|labelStr| +---+--------+ | 1| noise| | 2| ack| +---+--------+ root |-- id: integer (nullable = false) |-- labelStr: string (nullable = true) |-- transformedByUDF: string (nullable = true) +---+--------+----------------+ | id|labelStr|transformedByUDF| +---+--------+----------------+ | 1| noise| noise| | 2| ack| signal| +---+--------+----------------+ 
0
source

All Articles