"""Functions for imputing missing data.""" import numpy as np import pandas as pd from pandas.api.types import is_numeric_dtype def replace_nan_with_mode(df, col, col_mode, random_state): """Replaces nan in categorical columns with the mode. Function only used for categorical columns. Replaces nan in categorical columns with the mode. If there are multiple modes, one mode is randomly chosen. Parameters ---------- df : dataframe dataframe containing the categorical columns to impute and the columns with the mode. col : str name of column to impute. col_mode : str name of column containing the calculated modes. random_state : int to seed the random generator. Returns ------- df : dataframe input dataframe with missing values in categorical column imputed. """ np.random.seed(seed=random_state) for index, row in df.iterrows(): if row[col] == "nan": # Deals with cases where there are multiple modes. If there are multiple modes, # one mode is chosen at random if (not isinstance(row[col_mode], str)) and len(row[col_mode]) > 1: df.at[index, col] = np.random.choice(list(row[col_mode])) else: df.at[index, col] = row[col_mode] return df def get_imputer(*, train_data, cols_to_impute, average_type, cols_to_groupby=None): """Retrieve imputer of input data for later use (performs no imputing). The complete train data set is used to impute the holdout test data set. This function is used to obtain imputations for storage and later use on separate data. The average specified (e.g. median) can be calculated on the data provided or the data can be grouped by specified features (e.g. binned age and sex) prior to average calculation. For categorical columns, the mode is used. The format of nan for categorical columns required for the function is 'nan'. Use apply_imputer to perform imputation. Parameters ---------- train_data : dataframe data to be used to for imputation at a later stage. This is likely the full train data set. cols_to_impute : list of strings names of columns to perform imputation on. If cols_to_groupby is not None, these columns cannot appear in cols_to_impute. average_type : str type of average to calculate. Must be either 'median' or 'mean'. For categorical columns, the 'mode' will automatically be calculated. cols_to_groupby : list of strings, optional option to group data before calculating average. Returns ------- imputer : dataframe contains average values calculated, to be used in imputation. Raises ------- ValueError raises an error if elements in cols_to_groupby appear in cols_to_impute ValueError error raised if nans are not in the correct format: 'nan'. """ if average_type not in ["mean", "median"]: raise ValueError("average_type must be either 'mean or 'median'.") if not cols_to_groupby is None: if any(column in cols_to_groupby for column in cols_to_impute): raise ValueError( "Elements in cols_to_groupby should not appear in cols_to_impute" ) imputer = [] for col in cols_to_impute: is_numeric = is_numeric_dtype(train_data[col]) # For numeric columns, calculate specified average_type if is_numeric: imputer_for_col = train_data.groupby(cols_to_groupby)[col].agg( [average_type] ) imputer_for_col = imputer_for_col.rename(columns={average_type: col}) # For categorical columns, calculate mode else: # Raise an error if nans are in the incorrect format all_nan_types = [None, "None", np.NaN, "nan", "NAN", "N/A"] cat_present = train_data[col].unique().tolist() if any(element in all_nan_types for element in cat_present): if not "nan" in cat_present: raise ValueError( "Missing values in categorical columns are not recorded as 'nan'." ) # Drop any categories with 'nan' so that when getting the mode, nan isn't # treated as a category cols_to_groupby_plus_impute = cols_to_groupby + [col] train_data_col = train_data[cols_to_groupby_plus_impute] train_data_col = train_data_col[train_data_col[col] != "nan"] imputer_for_col = train_data_col.groupby(cols_to_groupby).agg( pd.Series.mode ) imputer.append(imputer_for_col) imputer = pd.concat(imputer, axis=1) # If there are any nans after grouping, fill nans with values from similar groups imputer = imputer.sort_values(cols_to_groupby) imputer = imputer.ffill().bfill() else: imputer = train_data[cols_to_impute].agg(average_type) return imputer def apply_imputer(*, data, cols_to_impute, imputer, cols_to_groupby=None): """Impute input data with supplied imputer. Parameters ---------- data : dataframe data with columns to be imputed. cols_to_impute : list of strings names of columns to be imputed. If cols_to_groupby is not None, these columns cannot appear in cols_to_impute. imputer : dataframe contains average values calculated, to be used in imputation. cols_to_groupby : list of strings, optional option to group data before calculating average. Must be the same as in get_imputer. Returns ------- data : dataframe imputed dataframe. Raises ------- ValueError raises an error if cols_to_groupby in apply_imputer do not match cols_to_groupby in get_imputer. """ if (cols_to_groupby) != (imputer.index.names): raise ValueError( "Groups used to generate the imputer and apply imputer must be the same. \ Groups used to generate the imputer are: {}, groups used to apply the \ imputer are: {}".format( imputer.index.names, cols_to_groupby ) ) if not cols_to_groupby is None: imputer = imputer.add_suffix("_avg") imputer = imputer.reset_index() data_imputed = data.merge(imputer, on=cols_to_groupby, how="left") for col in cols_to_impute: is_numeric = is_numeric_dtype(data[col]) if is_numeric: data_imputed[col] = np.where( data_imputed[col].isna(), data_imputed[col + "_avg"], data_imputed[col], ) else: data_imputed = replace_nan_with_mode( data_imputed, col, col + "_avg", random_state=0 ) # Drop columns containing the average values data_imputed = data_imputed.loc[:, ~data_imputed.columns.str.endswith("_avg")] else: data_imputed = data_imputed.fillna(imputer) return data_imputed def kfold_impute( *, df, fold_ids, cols_to_impute, average_type, cols_to_groupby, id_col, ): """Perform K-fold imputation. Fold by fold imputation of train data is used to prevent data leakage in cross- validation (the same folds are used for imputation and CV). For example, in 5-fold imputation, each fold is imputed using the other 4 folds and that fold is then used as the validation fold in CV. Parameters ---------- df : dataframe data with columns to be imputed. Will generally be the train data. fold_ids : list of arrays each array contains the validation patient IDs for each fold. cols_to_impute : list of strings columns to impute. average_type : str type of average to calculate (e.g. median, mean). cols_to_groupby : list of strings, optional option to group data before calculating average. id_col : str name of patient ID column. Returns ------- imputed_df_cv : dataframe k-fold imputed version of the input data. """ # Loop over CV folds and perform K-fold imputation imputed_data_cv = [] for fold in fold_ids: # Divide data into train folds and validation fold validation_fold = df[df[id_col].isin(fold)] train_folds = df[~df[id_col].isin(fold)] # Obtain imputers from train folds fold_imputer = get_imputer( train_data=train_folds, cols_to_impute=cols_to_impute, average_type=average_type, cols_to_groupby=cols_to_groupby, ) # Apply to validation fold imputed_data_fold = apply_imputer( data=validation_fold, cols_to_impute=cols_to_impute, imputer=fold_imputer, cols_to_groupby=cols_to_groupby, ) imputed_data_cv.append(imputed_data_fold) # Place the imputed validation fold data into a single df imputed_df_cv = pd.concat(imputed_data_cv) return imputed_df_cv