Given the Spark data frame, I would like to calculate the average column value based on non-empty and unknown values ββfor this column. Then I would like to accept this value and use it to replace the missing and unknown column values.
For example, if I work with:
- A dataframe named df, where each record represents one separate, and all columns are integer or numeric
- Column named age (age for each entry)
- A column named missing_age (which is 1 if this person has no age, 0 otherwise)
- A column named unknown_age (which is 1 if this person is of unknown age, 0 otherwise)
Then I can calculate this value as shown below.
calc_mean = df.where((col("unknown_age") == 0) & (col("missing_age") == 0)) .agg(avg(col("age")))
OR through SQL and windows functions,
mean_compute = hiveContext.sql("select avg(age) over() as mean from df where missing_age = 0 and unknown_age = 0")
I do not want to use SQL / windows functions if I can help. My task took this value and replaced the unknown / missing values ββusing methods other than SQL.
I tried to use when (), where (), replace (), withColumn, UDF and combinations ... No matter what I do, I either get errors or the results are not what I expect. Here is an example of one of the many things I tried that didn't work.
imputed = df.when((col("unknown_age") == 1) | (col("missing_age") == 1), calc_mean).otherwise("age")
I browsed the web, but did not find any similar issues such as imputation, so any help is greatly appreciated. It may be something very simple that I missed.
Side note. I am trying to apply this code to all columns in a Spark Dataframe that do not have unknown_ or missing_ in the column names. Can I just link the Spark-related Python code for the loop and skip all the relevant columns?
UPDATE:
Also figured out how to iterate over columns ... Here's an example.
for x in df.columns: if 'unknown_' not in x and 'missing_' not in x: avg_compute = df.where(df['missing_' + x] != 1).agg(avg(x)).first()[0] df = df.withColumn(x + 'mean_miss_imp', when((df['missing_' + x] == 1), avg_compute).otherwise(df[x]))