Microsecond processing in Spark Scala

I imported a PostgreSQL table into a spark as a data frame using Scala. The data block looks like

user_id | log_dt --------| ------- 96 | 2004-10-19 10:23:54.0 1020 | 2017-01-12 12:12:14.931652 

I will convert this data frame to have a data format for log_dt like yyyy-MM-dd hh:mm:ss.SSSSSS . To do this, I used the following code to convert the log_dt format to the timestamp format using the unix_timestamp function.

  val tablereader1=tablereader1Df.withColumn("log_dt",unix_timestamp(tablereader1Df("log_dt"),"yyyy-MM-dd hh:mm:ss.SSSSSS").cast("timestamp")) 

When I print to print a data block tablereader1 using the command tablereader1.show() , I get the following result

 user_id | log_dt --------| ------- 96 | 2004-10-19 10:23:54.0 1020 | 2017-01-12 12:12:14.0 

How to save microseconds as part of a timestamp? Any suggestions are welcome.

+7
java scala datetime apache-spark apache-spark-sql
source share
1 answer

Milliseconds with date_format()

You can use Spark SQL date_format() , which accepts Java SimpleDateFormat . SimpleDateFormat can only process milliseconds with pattern "S" .

 import org.apache.spark.sql.functions._ import spark.implicits._ //to use $-notation on columns val df = tablereader1Df.withColumn("log_dt", date_format($"log_dt", "S")) 

Update: microseconds with LocalDateTime for Java 8

 //Imports import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoField; /* //Commented as per comment about IntelliJ spark.udf.register("date_microsec", (dt: String) => val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n") LocalDateTime.parse(dt, dtFormatter).getLong(ChronoField.MICRO_OF_SECOND) ) */ import org.apache.spark.sql.functions.udf val date_microsec = udf((dt: String) => { val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n") LocalDateTime.parse(dt, dtFormatter).getLong(ChronoField.MICRO_OF_SECOND) }) 

Check out help in creating a DateTimeFormatter template

Use ChronoField.NANO_OF_SECOND instead of ChronoField.MICRO_OF_SECOND to retrieve Nanosecond in UDF.

 val df = tablereader1Df.withColumn("log_date_microsec", date_microsec($"log_dt")) 
+5
source share

All Articles