Geofilter with Spark DataFrame

I am new to dataframes with sparks and this is sometimes weird. Say I have a dataframe containing Logs with latitude and longitude coordinates.

LogsDataFrame.printSchema : root |-- lat: double (nullable = false) |-- lon: double (nullable = false) |-- imp: string (nullable = false) |-- log_date: string (nullable = true) |-- pubuid: string (nullable = true) 

On the other hand, I have a simple method

 within(lat : Double, long : Double, radius : Double) : Boolean 

which says lat and lon are in a certain radius of a predefined location.

Now, how do I filter the Log point, which does not satisfy inside. I tried

logsDataFrame.filter(within(logsDF("lat"), logsDF("lon"), RADIUS)

But it does not output Double and instead returns Column as type. How can I make this work? The documents in the spark node are a little simplified, I'm sure something is missing.

Thank you for your help.

+4
source share
1 answer

Generally speaking, you need at least two things to make it work. First you need to create a UDF wrapper within :

 import org.apache.spark.sql.functions.{udf, lit} val withinUDF = udf(within _) 

Then, when UDF is called, the radius should be marked as a literal:

 df.where(withinUDF($"lat", $"long", lit(RADIUS))) 

Since not every type can be passed this way, and wrapping and calling lit quite tedious, you may prefer currying:

 def within(radius: Double) = udf((lat: Double, long: Double) => ???) df.where(within(RADIUS)($"lat", $"long")) 
+6
source

All Articles