""" Derive features from lab tests for 2 models: Parallel model 1: uses both hospital and community exacerbation events Parallel model 2: uses only hospital exacerbation events """ import numpy as np import pandas as pd import sys import os import model_h import ggc.preprocessing.labs as labs_preprocessing import yaml def calc_lab_metric(lab_df, data, lab_name, metric, weigh_data_by_recency=False): """ Calculate metrics on laboratory data. Args: lab_df (pd.DataFrame): dataframe containing labs to be used in calculations. data (pd.DataFrame): main dataframe to which columns containing the results from the lab calculations are merged onto. lab_name (list): name of labs required for metric calculations. metric (str): name of metric to be calculated. The possible metrics are: 'MaxLifetime': calculates the maximum value of lab for patient within entire dataset before their index date. 'MinLifetime': calculates the minimum value of lab for patient within entire dataset before their index date. 'Max1Year': calculates the maximum value of lab for patient within 1 year prior to index date. 'Min1Year': calculates the maximum value of lab for patient within 1 year prior to index date. 'Latest': finds the closest lab value prior to index date. weigh_data_by_recency (bool): option to weigh data based on how recent it is. Older observations are decreased or increased towards the median. Defaults to False. Returns: pd.DataFrame: the input dataframe with additional columns with calculated metrics. """ # Subset labs to only those specified in lab_names cols_to_keep = ["StudyId", "IndexDate", "TimeSinceLab"] cols_to_keep.append(lab_name) labs_calc = lab_df[cols_to_keep] # Subset labs to correct time frames and calculate metrics if (metric == "Max1Year") | (metric == "Min1Year"): labs_calc = labs_calc[labs_calc["TimeSinceLab"] <= 365] if (metric == "MaxLifetime") | (metric == "Max1Year"): labs_calc = labs_calc.groupby(["StudyId", "IndexDate"]).max() if (metric == "MinLifetime") | (metric == "Min1Year"): labs_calc = labs_calc.groupby(["StudyId", "IndexDate"]).min() labs_calc = labs_calc.drop(columns=["TimeSinceLab"]) if metric == "Latest": labs_calc = labs_calc[labs_calc["TimeSinceLab"] <= 365] labs_calc = labs_calc.sort_values( by=["StudyId", "IndexDate", "TimeSinceLab"], ascending=True ) labs_calc["TimeSinceLab"] = np.where( labs_calc[lab_name].isna(), np.NaN, labs_calc["TimeSinceLab"] ) labs_calc = labs_calc.bfill() labs_calc = labs_calc.drop_duplicates( subset=["StudyId", "IndexDate"], keep="first" ) if weigh_data_by_recency is True: median_val = labs_calc[lab_name].median() labs_calc = model_h.weigh_features_by_recency( df=labs_calc, feature=lab_name, feature_recency_days="TimeSinceLab", median_value=median_val, decay_rate=0.001, ) labs_calc = labs_calc.set_index(["StudyId", "IndexDate"]) # Add prefix to lab names and merge with main df labs_calc = labs_calc.add_prefix(metric) labs_calc = labs_calc.reset_index() data = data.merge(labs_calc, on=["StudyId", "IndexDate"], how="left") return data with open("./training/config.yaml", "r") as config: config = yaml.safe_load(config) # Specify which model to generate features for model_type = config["model_settings"]["model_type"] # Setup log file log = open("./training/logging/process_labs_" + model_type + ".log", "w") sys.stdout = log # Dataset to process - set through config file data_to_process = config["model_settings"]["data_to_process"] # Load cohort data if data_to_process == "forward_val": data = pd.read_pickle("./data/patient_labels_forward_val_hosp_comm.pkl") patient_details = pd.read_pickle("./data/patient_details_forward_val.pkl") else: data = pd.read_pickle("./data/patient_labels_" + model_type + ".pkl") patient_details = pd.read_pickle("./data/patient_details.pkl") data = data[["StudyId", "IndexDate"]] patient_details = data.merge( patient_details[["StudyId", "PatientId"]], on="StudyId", how="left", ) # Read mapping between StudyId and SafeHavenID id_mapping = pd.read_pickle("./data/sh_to_studyid_mapping.pkl") # Remove mapping for patient SU125 as the mapping for this patient is incorrect id_mapping["SafeHavenID"] = np.where( id_mapping["StudyId"] == "SU125", np.NaN, id_mapping["SafeHavenID"] ) id_mapping = id_mapping.merge( data[["StudyId"]], on="StudyId", how="inner" ).drop_duplicates() print( "Num patients with SafeHaven mapping: {} of {}".format( len(id_mapping), data.StudyId.nunique() ) ) # Add column with SafeHavenID to main df patient_details = patient_details.merge(id_mapping, on="StudyId", how="left") # Calculate the lookback start date. Will need this to aggreggate data for model # features patient_details["LookbackStartDate"] = patient_details["IndexDate"] - pd.DateOffset( days=config["model_settings"]["lookback_period"] ) ############################################################################ # Derive features from labs ############################################################################ # Convert column names into format required for labs processing using the ggc package cols_to_use = [ "SafeHavenID", "ClinicalCodeDescription", "QuantityUnit", "RangeHighValue", "RangeLowValue", "QuantityValue", "SampleDate", ] labs = pd.read_csv(config["inputs"]["raw_data_paths"]["labs"], usecols=cols_to_use) # Subset labs table to only patients of interest labs = labs[labs.SafeHavenID.isin(patient_details.SafeHavenID)] # Process labs lookup_table = pd.read_csv(config["inputs"]["raw_data_paths"]["labs_lookup_table"]) tests_of_interest = [ "Eosinophils", "Albumin", "Neutrophils", "White Blood Count", "Lymphocytes", ] labs_processed = labs_preprocessing.clean_labs_data( df=labs, tests_of_interest=tests_of_interest, units_lookup=lookup_table, print_log=True, ) labs_processed = patient_details[["StudyId", "IndexDate", "SafeHavenID"]].merge( labs_processed, on="SafeHavenID", how="left" ) labs_processed["SampleDate"] = pd.to_datetime(labs_processed["SampleDate"], utc=True) labs_processed["TimeSinceLab"] = ( labs_processed["IndexDate"] - labs_processed["SampleDate"] ).dt.days # Only keep labs performed before IndexDate labs_processed = labs_processed[labs_processed["TimeSinceLab"] >= 0] # Convert lab names to columns labs_processed = pd.pivot_table( labs_processed, values="QuantityValue", index=["StudyId", "IndexDate", "TimeSinceLab"], columns=["ClinicalCodeDescription"], ) labs_processed = labs_processed.reset_index() # Calculate neutrophil/lymphocyte ratio labs_processed["NeutLymphRatio"] = ( labs_processed["Neutrophils"] / labs_processed["Lymphocytes"] ) # Calculate lowest albumin in past year data = calc_lab_metric(labs_processed, data, lab_name="Albumin", metric="Min1Year") # Calculate the latest lab value lab_names = [ "NeutLymphRatio", "Albumin", "Eosinophils", "Neutrophils", "White Blood Count", ] for lab_name in lab_names: data = calc_lab_metric( labs_processed, data, lab_name, metric="Latest", weigh_data_by_recency=True ) # Save data os.makedirs(config["outputs"]["processed_data_dir"], exist_ok=True) if data_to_process == "forward_val": data.to_pickle( os.path.join( config["outputs"]["processed_data_dir"], "labs_forward_val_" + model_type + ".pkl", ) ) else: data.to_pickle( os.path.join( config["outputs"]["processed_data_dir"], "labs_" + model_type + ".pkl", ) )