Working with a data frame

I am using the DataFrame API from Spark 1.3.

I would like to get the day of the week from the date in the DataFrame without losing all the DataFrame elements.

I used jodatime to get it on a simple map before using the DataFrame API.

Now the solution is working:

sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema)) 

Is it possible to perform an operation without returning to the map on the RDD[Row] , and then create a DataFrame with this RDD?

+7
scala apache-spark apache-spark-sql
source share
2 answers

try it

 Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed")) 

MyUdf is the udf defined by you.

+1
source share

You can use a combination of calling select() in a DataFrame and a user-defined function (UDF) to transform the column in question.

 import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.functions._ 

The case class to set the example DataFrame .

 private case class Cust(id: Integer, name: String, sales: Double, discount: Double, state: String) 

Then configure a SQLContext and create a DataFrame as follows:

 import sqlContext.implicits._ val custs = Seq( Cust(1, "Widget Co", 120000.00, 0.00, "AZ"), Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"), Cust(3, "Widgetry", 410500.00, 200.00, "CA"), Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"), Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA") ) val customerDF = sc.parallelize(custs, 4).toDF() 

Register a simple UDF that you will use to convert the discount column.

 val myFunc = udf {(x: Double) => x + 1} 

Get the columns by applying UDF to the discount column and leaving the rest as they were.

 val colNames = customerDF.columns val cols = colNames.map(cName => customerDF.col(cName)) val theColumn = customerDF("discount") 

I would like to find the β€œbest” way to map the column, but the following works. Use as() to give the column a new name just because we can!

 val mappedCols = cols.map(c => if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c) 

Use select () to create a new DataFrame

 val newDF = customerDF.select(mappedCols:_*) 

You changed

 id name sales discount state 1 Widget Co 120000.0 0.0 AZ 2 Acme Widgets 410500.0 500.0 CA 3 Widgetry 410500.0 200.0 CA 4 Widgets R Us 410500.0 0.0 CA 5 Ye Olde Widgete 500.0 0.0 MA 

in

 id name sales transformed state 1 Widget Co 120000.0 1.0 AZ 2 Acme Widgets 410500.0 501.0 CA 3 Widgetry 410500.0 201.0 CA 4 Widgets R Us 410500.0 1.0 CA 5 Ye Olde Widgete 500.0 1.0 MA 

Here you can find the complete source code example. You can make it easier if you are not fussy about the exact replacement of a column.

+10
source share

All Articles