copd-model-h / training /imputation.py
IamGrooooot's picture
Inital Upload
000de75
"""Functions for imputing missing data."""
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
def replace_nan_with_mode(df, col, col_mode, random_state):
"""Replaces nan in categorical columns with the mode.
Function only used for categorical columns. Replaces nan in categorical columns with the
mode. If there are multiple modes, one mode is randomly chosen.
Parameters
----------
df : dataframe
dataframe containing the categorical columns to impute and the columns with the mode.
col : str
name of column to impute.
col_mode : str
name of column containing the calculated modes.
random_state : int
to seed the random generator.
Returns
-------
df : dataframe
input dataframe with missing values in categorical column imputed.
"""
np.random.seed(seed=random_state)
for index, row in df.iterrows():
if row[col] == "nan":
# Deals with cases where there are multiple modes. If there are multiple modes,
# one mode is chosen at random
if (not isinstance(row[col_mode], str)) and len(row[col_mode]) > 1:
df.at[index, col] = np.random.choice(list(row[col_mode]))
else:
df.at[index, col] = row[col_mode]
return df
def get_imputer(*, train_data, cols_to_impute, average_type, cols_to_groupby=None):
"""Retrieve imputer of input data for later use (performs no imputing).
The complete train data set is used to impute the holdout test data set. This function
is used to obtain imputations for storage and later use on separate data. The average
specified (e.g. median) can be calculated on the data provided or the data can be
grouped by specified features (e.g. binned age and sex) prior to average calculation.
For categorical columns, the mode is used. The format of nan for categorical columns
required for the function is 'nan'.
Use apply_imputer to perform imputation.
Parameters
----------
train_data : dataframe
data to be used to for imputation at a later stage. This is likely the full train
data set.
cols_to_impute : list of strings
names of columns to perform imputation on. If cols_to_groupby is not None, these
columns cannot appear in cols_to_impute.
average_type : str
type of average to calculate. Must be either 'median' or 'mean'. For categorical
columns, the 'mode' will automatically be calculated.
cols_to_groupby : list of strings, optional
option to group data before calculating average.
Returns
-------
imputer : dataframe
contains average values calculated, to be used in imputation.
Raises
-------
ValueError
raises an error if elements in cols_to_groupby appear in cols_to_impute
ValueError
error raised if nans are not in the correct format: 'nan'.
"""
if average_type not in ["mean", "median"]:
raise ValueError("average_type must be either 'mean or 'median'.")
if not cols_to_groupby is None:
if any(column in cols_to_groupby for column in cols_to_impute):
raise ValueError(
"Elements in cols_to_groupby should not appear in cols_to_impute"
)
imputer = []
for col in cols_to_impute:
is_numeric = is_numeric_dtype(train_data[col])
# For numeric columns, calculate specified average_type
if is_numeric:
imputer_for_col = train_data.groupby(cols_to_groupby)[col].agg(
[average_type]
)
imputer_for_col = imputer_for_col.rename(columns={average_type: col})
# For categorical columns, calculate mode
else:
# Raise an error if nans are in the incorrect format
all_nan_types = [None, "None", np.NaN, "nan", "NAN", "N/A"]
cat_present = train_data[col].unique().tolist()
if any(element in all_nan_types for element in cat_present):
if not "nan" in cat_present:
raise ValueError(
"Missing values in categorical columns are not recorded as 'nan'."
)
# Drop any categories with 'nan' so that when getting the mode, nan isn't
# treated as a category
cols_to_groupby_plus_impute = cols_to_groupby + [col]
train_data_col = train_data[cols_to_groupby_plus_impute]
train_data_col = train_data_col[train_data_col[col] != "nan"]
imputer_for_col = train_data_col.groupby(cols_to_groupby).agg(
pd.Series.mode
)
imputer.append(imputer_for_col)
imputer = pd.concat(imputer, axis=1)
# If there are any nans after grouping, fill nans with values from similar groups
imputer = imputer.sort_values(cols_to_groupby)
imputer = imputer.ffill().bfill()
else:
imputer = train_data[cols_to_impute].agg(average_type)
return imputer
def apply_imputer(*, data, cols_to_impute, imputer, cols_to_groupby=None):
"""Impute input data with supplied imputer.
Parameters
----------
data : dataframe
data with columns to be imputed.
cols_to_impute : list of strings
names of columns to be imputed. If cols_to_groupby is not None, these columns cannot
appear in cols_to_impute.
imputer : dataframe
contains average values calculated, to be used in imputation.
cols_to_groupby : list of strings, optional
option to group data before calculating average. Must be the same as in get_imputer.
Returns
-------
data : dataframe
imputed dataframe.
Raises
-------
ValueError
raises an error if cols_to_groupby in apply_imputer do not match cols_to_groupby in
get_imputer.
"""
if (cols_to_groupby) != (imputer.index.names):
raise ValueError(
"Groups used to generate the imputer and apply imputer must be the same. \
Groups used to generate the imputer are: {}, groups used to apply the \
imputer are: {}".format(
imputer.index.names, cols_to_groupby
)
)
if not cols_to_groupby is None:
imputer = imputer.add_suffix("_avg")
imputer = imputer.reset_index()
data_imputed = data.merge(imputer, on=cols_to_groupby, how="left")
for col in cols_to_impute:
is_numeric = is_numeric_dtype(data[col])
if is_numeric:
data_imputed[col] = np.where(
data_imputed[col].isna(),
data_imputed[col + "_avg"],
data_imputed[col],
)
else:
data_imputed = replace_nan_with_mode(
data_imputed, col, col + "_avg", random_state=0
)
# Drop columns containing the average values
data_imputed = data_imputed.loc[:, ~data_imputed.columns.str.endswith("_avg")]
else:
data_imputed = data_imputed.fillna(imputer)
return data_imputed
def kfold_impute(
*,
df,
fold_ids,
cols_to_impute,
average_type,
cols_to_groupby,
id_col,
):
"""Perform K-fold imputation.
Fold by fold imputation of train data is used to prevent data leakage in cross-
validation (the same folds are used for imputation and CV). For example, in 5-fold
imputation, each fold is imputed using the other 4 folds and that fold is then used as
the validation fold in CV.
Parameters
----------
df : dataframe
data with columns to be imputed. Will generally be the train data.
fold_ids : list of arrays
each array contains the validation patient IDs for each fold.
cols_to_impute : list of strings
columns to impute.
average_type : str
type of average to calculate (e.g. median, mean).
cols_to_groupby : list of strings, optional
option to group data before calculating average.
id_col : str
name of patient ID column.
Returns
-------
imputed_df_cv : dataframe
k-fold imputed version of the input data.
"""
# Loop over CV folds and perform K-fold imputation
imputed_data_cv = []
for fold in fold_ids:
# Divide data into train folds and validation fold
validation_fold = df[df[id_col].isin(fold)]
train_folds = df[~df[id_col].isin(fold)]
# Obtain imputers from train folds
fold_imputer = get_imputer(
train_data=train_folds,
cols_to_impute=cols_to_impute,
average_type=average_type,
cols_to_groupby=cols_to_groupby,
)
# Apply to validation fold
imputed_data_fold = apply_imputer(
data=validation_fold,
cols_to_impute=cols_to_impute,
imputer=fold_imputer,
cols_to_groupby=cols_to_groupby,
)
imputed_data_cv.append(imputed_data_fold)
# Place the imputed validation fold data into a single df
imputed_df_cv = pd.concat(imputed_data_cv)
return imputed_df_cv