How to change column types in Spark SQL DataFrame?

Suppose I do something like:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |-- year: string (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) df.show() year make model comment blank 2012 Tesla S No comment 1997 Ford E350 Go get one now th... 

but I really wanted year as Int (and possibly convert some other columns).

The best I could come up with is

 df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank) org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string] 

which is a bit confusing.

I come from R, and I'm used to writing, for example.

 df2 <- df %>% mutate(year = year %>% as.integer, make = make %>% toupper) 

Most likely, I missed something, because in the sparks / scala ... there should be a better way to do this ...

+137
scala apache-spark apache-spark-sql
Apr 01 '15 at 4:55
source share
18 answers

Change: latest version

Starting with spark 2.x you can use .withColumn . Check out the docs here:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame

Oldest answer

Starting with version Spark 1.4, you can apply the cast method with DataType to a column:

 import org.apache.spark.sql.types.IntegerType val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType)) .drop("year") .withColumnRenamed("yearTmp", "year") 

If you use SQL expressions, you can also do:

 val df2 = df.selectExpr("cast(year as int) year", "make", "model", "comment", "blank") 

For more information check the docs: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

+132
Oct 29 '15 at 20:27
source share

[EDIT: March 2016: thanks for the votes! Although this is actually not the best answer, I think that solutions based on withColumn , withColumnRenamed and cast put forward by msemelman, Martin Senne and others are easier and cleaner].

I think your approach is fine, recall that the Spark DataFrame is an (immutable) RDD row, so we never replace a column by simply creating a new DataFrame each time with a new schema.

Assuming you have the original df with the following circuit:

 scala> df.printSchema root |-- Year: string (nullable = true) |-- Month: string (nullable = true) |-- DayofMonth: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- DepDelay: string (nullable = true) |-- Distance: string (nullable = true) |-- CRSDepTime: string (nullable = true) 

And some UDFs are defined in one or more columns:

 import org.apache.spark.sql.functions._ val toInt = udf[Int, String]( _.toInt) val toDouble = udf[Double, String]( _.toDouble) val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) val days_since_nearest_holidays = udf( (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12 ) 

Changing column types or even creating a new DataFrame from another can be written as follows:

 val featureDf = df .withColumn("departureDelay", toDouble(df("DepDelay"))) .withColumn("departureHour", toHour(df("CRSDepTime"))) .withColumn("dayOfWeek", toInt(df("DayOfWeek"))) .withColumn("dayOfMonth", toInt(df("DayofMonth"))) .withColumn("month", toInt(df("Month"))) .withColumn("distance", toDouble(df("Distance"))) .withColumn("nearestHoliday", days_since_nearest_holidays( df("Year"), df("Month"), df("DayofMonth")) ) .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", "month", "distance", "nearestHoliday") 

which gives:

 scala> df.printSchema root |-- departureDelay: double (nullable = true) |-- departureHour: integer (nullable = true) |-- dayOfWeek: integer (nullable = true) |-- dayOfMonth: integer (nullable = true) |-- month: integer (nullable = true) |-- distance: double (nullable = true) |-- nearestHoliday: integer (nullable = true) 

This is pretty close to your own decision. Simply by saving type changes and other conversions as separate udf val , make the code more readable and reusable.

+87
Apr 10 '15 at 9:39
source share

Since the cast operation is available for Spark Column (and since I personally do not approve of udf , as suggested by @ Svend at this point), how about:

 df.select( df("year").cast(IntegerType).as("year"), ... ) 

to apply to the requested type? As a neat side effect, values ​​independent / "convertible" in this sense will become null .

If you need this as a helper method , use:

 object DFHelper{ def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = { df.withColumn( cn, df(cn).cast(tpe) ) } } 

which is used as:

 import DFHelper._ val df2 = castColumnTo( df, "year", IntegerType ) 
+60
Sep 17 '15 at 15:54
source share

First , if you want to use a type, then this:

 import org.apache.spark.sql df.withColumn("year", $"year".cast(sql.types.IntegerType)) 

With the same column name, the column will be replaced with a new one. You do not need to add or remove steps.

Secondly , about Scala vs. R.
This is the code that looks most like RI:

 val df2 = df.select( df.columns.map { case year @ "year" => df(year).cast(IntegerType).as(year) case make @ "make" => functions.upper(df(make)).as(make) case other => df(other) }: _* ) 

Although the length of the code is slightly longer than that of R. This has nothing to do with the verbosity of the language. In R mutate , this is a special function for the R-frame of data, while in Scala you can easily make it, given its expressive power. In a word, he does not want to do too much for you, because the foundation is good enough so that you can easily create your own functions of the domain language.




note: df.columns unexpectedly represents Array[String] instead of Array[Column] , maybe they want it to look like a Python data frame for pandas.

+44
Aug 21 '15 at 13:12
source share

You can use selectExpr to make it a little cleaner:

 df.selectExpr("cast(year as int) as year", "upper(make) as make", "model", "comment", "blank") 
+16
Aug 14 '15 at 19:24
source share

Java code to change DataFrame data type from String to Integer

 df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType)) 

It simply converts an existing (String data type) to Integer.

+10
May 19 '16 at 10:42
source share

To convert a year from a string to int, you can add the following option to the csv reader: "inferSchema" β†’ "true", see the DataBricks Documentation

+8
Aug 16 '15 at 2:49
source share

So, this really works if you have problems saving the jdbc driver, such as sqlserver, but it is really useful for errors that you encounter with syntax and types.

 import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} import org.apache.spark.sql.jdbc.JdbcType val SQLServerDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver") override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT)) case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL)) case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC") } } JdbcDialects.registerDialect(SQLServerDialect) 
+6
Apr 19 '16 at 7:32
source share

Create a simple data set containing five values ​​and convert the int to string type:

 val df = spark.range(5).select( col("id").cast("string") ) 
+6
Jul 12 '18 at 22:02
source share

answers suggesting to use cast, FYI, the method of casting in spark 1.4.1 is broken.

for example, a data framework with a string column having the value "8182175552014127960" when transferred to bigint has the value "8182175552014128100"

  df.show +-------------------+ | a| +-------------------+ |8182175552014127960| +-------------------+ df.selectExpr("cast(a as bigint) a").show +-------------------+ | a| +-------------------+ |8182175552014128100| +-------------------+ 

We had to face a lot of problems before finding this error, because we had large columns in production.

+5
Aug 05 '16 at 12:47
source share
 df.select($"long_col".cast(IntegerType).as("int_col")) 
+5
Aug 31 '16 at 6:31
source share

Using Spark Sql 2.4.0, you can do this:

 spark.sql("SELECT STRING(NULLIF(column,'')) as column_string") 
+4
Apr 08 '19 at 15:26
source share

This method will lose the old column and create new columns with the same values ​​and new data type. My original data types when creating the DataFrame were: -

 root |-- id: integer (nullable = true) |-- flag1: string (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag3: string (nullable = true) 

After that, I executed the following code to change the data type: -

 df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3 df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>) 

After that, my result was: -

 root |-- id: integer (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag1: boolean (nullable = true) |-- flag3: boolean (nullable = true) 
+2
PirateJack May 18 '17 at 5:43
source share

You can use the code below.

 df.withColumn("year", df("year").cast(IntegerType)) 

Which will convert the year column to IntegerType column.

+2
Jan 16 '18 at 12:32
source share

If you need to rename dozens of columns specified by their names, the following example uses the @dnlbrky approach and applies it to multiple columns at the same time:

 df.selectExpr(df.columns.map(cn => { if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn" else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn" else cn }):_*) 

Unclassed columns remain unchanged. All columns remain in the original order.

+1
May 18 '19 at 10:16
source share

You can change the data type of a column using the cast in spark sql method. the table name is a table, and there are only two columns in it: only columns column1 and column2 and the data type of column1 must be changed. ex-spark.sql ("select cast (column1 as Double) column1NewName, column2 from the table") Instead of double-entry, enter your data type.

0
Oct 06 '16 at 8:54 on
source share
  val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd //Schema to be applied to the table val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType) val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates() 
-one
Oct 03 '16 at 2:56 on
source share

Differently:

 // Generate a simple dataset containing five values and convert int to string type val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value") 
-one
Jul 12 '18 at 22:16
source share



All Articles