Pulses Dataframe Pyspark - replacing unknown and missing values ​​using the Average column based on the specified condition

Given the Spark data frame, I would like to calculate the average column value based on non-empty and unknown values ​​for this column. Then I would like to accept this value and use it to replace the missing and unknown column values.

For example, if I work with:

  • A dataframe named df, where each record represents one separate, and all columns are integer or numeric
  • Column named age (age for each entry)
  • A column named missing_age (which is 1 if this person has no age, 0 otherwise)
  • A column named unknown_age (which is 1 if this person is of unknown age, 0 otherwise)

Then I can calculate this value as shown below.

calc_mean = df.where((col("unknown_age") == 0) & (col("missing_age") == 0)) .agg(avg(col("age"))) 

OR through SQL and windows functions,

 mean_compute = hiveContext.sql("select avg(age) over() as mean from df where missing_age = 0 and unknown_age = 0") 

I do not want to use SQL / windows functions if I can help. My task took this value and replaced the unknown / missing values ​​using methods other than SQL.

I tried to use when (), where (), replace (), withColumn, UDF and combinations ... No matter what I do, I either get errors or the results are not what I expect. Here is an example of one of the many things I tried that didn't work.

 imputed = df.when((col("unknown_age") == 1) | (col("missing_age") == 1), calc_mean).otherwise("age") 

I browsed the web, but did not find any similar issues such as imputation, so any help is greatly appreciated. It may be something very simple that I missed.

Side note. I am trying to apply this code to all columns in a Spark Dataframe that do not have unknown_ or missing_ in the column names. Can I just link the Spark-related Python code for the loop and skip all the relevant columns?

UPDATE:

Also figured out how to iterate over columns ... Here's an example.

 for x in df.columns: if 'unknown_' not in x and 'missing_' not in x: avg_compute = df.where(df['missing_' + x] != 1).agg(avg(x)).first()[0] df = df.withColumn(x + 'mean_miss_imp', when((df['missing_' + x] == 1), avg_compute).otherwise(df[x])) 
+5
source share
1 answer

If age is unknown or missing, this value is:

 from pyspark.sql.functions import col, avg, when df = sc.parallelize([ (10, 0, 0), (20, 0, 0), (-1, 1, 0), (-1, 0, 1) ]).toDF(["age", "missing_age", "unknown_age"]) avg_age = df.where( (col("unknown_age") != 1) & (col("missing_age") != 1) ).agg(avg("age")).first()[0] df.withColumn("age_imp", when( (col("unknown_age") == 1) | (col("missing_age") == 1), avg_age ).otherwise(col("age"))) 

If age is unknown or missing NULL, you can simplify this:

 df = sc.parallelize([ (10, 0, 0), (20, 0, 0), (None, 1, 0), (None, 0, 1) ]).toDF(["age", "missing_age", "unknown_age"]) df.na.fill(df.na.drop().agg(avg("age")).first()[0], ["age"]) 
+5
source

All Articles