Replace missing values ​​with a middle value - Spark Dataframe

I have a Spark Dataframe with some missing values. I would like to perform a simple imputation, replacing the missing values ​​with the average value for this column. I am very new to Spark, so I'm struggling to implement this logic. This is what I managed to do so far:

a) To do this for a single column (say, Col A), this line of code works:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA")) .first()(0).asInstanceOf[Double]) .otherwise($"ColA")) 

b) However, I was not able to figure out how to do this for all the columns in my framework. I tried the "Map" function, but I believe that it goes through each row of the data frame

c) There is a similar question about SO -. And although I liked the solution (using Aggregated tables and coalesce), I really wanted to find out if there is a way to do this by going through each column (I come from R, so scrolling through each column using a higher order of operation, for example lapply seems to me more natural).

Thanks!

+7
scala dataframe apache-spark apache-spark-sql imputation
source share
3 answers

Spark> = 2.2

You can use org.apache.spark.ml.feature.Imputer (which supports both medium and median strategy).

Scala :

  import org.apache.spark.ml.feature.Imputer val imputer = new Imputer().setInputCols(df.columns).setOutputCols(df.columns.map(c = > s "$ {c} _imputed" )).setStrategy( "" ) imputer.fit(DF).transform(DF) > 

Python

   pyspark.ml.feature import Imputer imputer = Imputer (   inputCols = df.columns,   outputCols = [ "{} _ imputed".format(c)  c  df.columns] ) imputer.fit(DF).transform(DF) > 

Spark & ​​lt; 2.2

Here you are:

  import org.apache.spark.sql.functions.mean df.na.fill(df.columns.zip( df.select(df.columns.map(mean (_)): _ *). first.toSeq ). ) > 

Where

  df.columns.map(mean (_)): Array [Column] > 

calculates the average value for each column,

  df.select(_: *). first.toSeq: Seq [Any] > 

collects aggregated values ​​and converts the string to Seq [Any] (I know that this is suboptimal, but this is the API we need to work with)

  df.columns.zip(_). toMap:  [String, Any] > 

creates aMap: Map [String, Any] , which maps the column name to its average value, and finally

  df.na.fill(_): DataFrame > 

fills in the missing values ​​using:

  fill:  [String, Any] = > DataFrame > 

from DataFrameNaFunctions .

To add NaN entries that you can replace:

  df.select(df.columns.map(mean (_)): _ *). first.toSeq > 

from:

  import org.apache.spark.sql.functions. {col, isnan, when} df.select(df.columns.map( c = >  ( (! isnan (col (c)), col (c))) ): _ *). first.toSeq > 
+11
source share

For PySpark, this is the code I used:

 mean_dict = { col: 'mean' for col in df.columns } col_avgs = df.agg( mean_dict ).collect()[0].asDict() col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() } df.fillna( col_avgs ).show() 

Four steps:

  • Create a mean_dict dictionary matching column names with aggregate operation (average)
  • Calculate the average value for each column and save it as a col_avgs dictionary
  • Column names in col_avgs begin with avg( and end with ) , for example. avg(col1) . Remove the parentheses.
  • Fill the data block columns with averages using col_avgs
+1
source share

To impute the median (instead of the average) in PySpark & ​​lt; 2.2

 ## filter numeric cols num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)] ### Compute a dict with <col_name, median_value> median_dict = dict() for c in num_cols: median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0] 

Then apply na.fill

 df_imputed = df.na.fill(median_dict) 
+1
source share

All Articles