""" Derive features PRO responses for 2 models: Parallel model 1: uses both hospital and community exacerbation events Parallel model 2: uses only hospital exacerbation events """ import numpy as np import pandas as pd import sys import os import re from collections import defaultdict import yaml def calc_total_pro_engagement(pro_df, pro_name): """ Calculates PRO engagement per patient across their entire time within the service. Args: pro_df (pd.DataFrame): dataframe containing the onboarding date and the latest prediction date. pro_name (str): name of the PRO. Returns: pd.DataFrame: the input dateframe with an additional column stating the total engagement for each patient across the service. """ # Calculate time in service according to type of PRO if pro_name == "EQ5D": date_unit = "M" if pro_name == "MRC": date_unit = "W" if (pro_name == "CAT") | (pro_name == "SymptomDiary"): date_unit = "D" pro_df["TimeInService"] = np.floor( ( (pro_df.LatestPredictionDate - pro_df.FirstSubmissionDate) / np.timedelta64(1, date_unit) ) ) # PRO engagement for the total time in service pro_response_count = pro_df.groupby("StudyId").count()[["PatientId"]].reset_index() pro_response_count = pro_response_count.rename( columns={"PatientId": "Response" + pro_name} ) pro_df = pro_df.merge(pro_response_count, on="StudyId", how="left") pro_df["TotalEngagement" + pro_name] = round( pro_df["Response" + pro_name] / pro_df["TimeInService"], 2 ) return pro_df def calc_pro_engagement_in_time_window(pro_df, pro_name, time_window, data): """ Calculates PRO engagement per patient across a specified time window. The time window is in format 'months', and consists of the specified time period prior to IndexDate. Args: pro_df (pd.DataFrame): dataframe containing the index dates and PRO response submission dates. pro_name (str): name of the PRO. time_window (int): number of months in which to calculate PRO engagement. data (pd.DataFrame): main dataframe. Returns: pd.DataFrame: a dataframe containing the calculated PRO engagement. """ # Calculate time in service according to type of PRO. if pro_name == "EQ5D": unit_val = 1 if pro_name == "MRC": unit_val = 4 if (pro_name == "CAT") | (pro_name == "SymptomDiary"): unit_val = 30 pro_df["SubmissionTime"] = pd.to_datetime(pro_df["SubmissionTime"], utc=True) pro_engagement_6mo = pro_df.copy() pro_engagement_6mo["TimeSinceSubmission"] = ( pro_engagement_6mo["IndexDate"] - pro_engagement_6mo["SubmissionTime"] ).dt.days # Only include PRO responses within the specified time window pro_engagement_6mo = pro_engagement_6mo[ pro_engagement_6mo["TimeSinceSubmission"].between( 0, (time_window * 30), inclusive="both" ) ] # Calculate number of PRO responses within specified time window pro_engagement_6mo = ( pro_engagement_6mo.groupby(["StudyId", "IndexDate"]) .count()[["PatientId"]] .reset_index() ) pro_engagement_6mo = pro_engagement_6mo.rename( columns={"PatientId": "ResponseCountTW" + str(time_window)} ) pro_engagement_6mo["Engagement" + pro_name + "TW" + str(time_window)] = round( pro_engagement_6mo["ResponseCountTW" + str(time_window)] / (time_window * unit_val), 2, ) pro_engagement_6mo = data[["StudyId", "IndexDate"]].merge( pro_engagement_6mo, on=["StudyId", "IndexDate"], how="left" ) # Fill N/As with 0 as no engagement was observed for those patients pro_engagement_6mo = pro_engagement_6mo.fillna(0) return pro_engagement_6mo def calc_pro_engagement_at_specific_month(pro_df, pro_name, month_num, data): # Calculate time in service according to type of PRO. if pro_name == "EQ5D": unit_val = 1 if pro_name == "MRC": unit_val = 4 if (pro_name == "CAT") | (pro_name == "SymptomDiary"): unit_val = 30 pro_df["SubmissionTime"] = pd.to_datetime(pro_df["SubmissionTime"], utc=True) pro_engagement = pro_df.copy() pro_engagement["TimeSinceSubmission"] = ( pro_engagement["IndexDate"] - pro_engagement["SubmissionTime"] ).dt.days # Only include PRO responses for the month specified # Calculate the number of months between index date and specified month months_between_index_and_specified = month_num - 1 pro_engagement = pro_engagement[ pro_engagement["TimeSinceSubmission"].between( (months_between_index_and_specified * 30), (month_num * 30), inclusive="both", ) ] # Calculate number of PRO responses within specified time window pro_engagement = ( pro_engagement.groupby(["StudyId", "IndexDate"]) .count()[["PatientId"]] .reset_index() ) pro_engagement = pro_engagement.rename( columns={"PatientId": "ResponseCountMonth" + str(month_num)} ) pro_engagement["Engagement" + pro_name + "Month" + str(month_num)] = round( pro_engagement["ResponseCountMonth" + str(month_num)] / (1 * unit_val), 2, ) pro_engagement = data[["StudyId", "IndexDate"]].merge( pro_engagement, on=["StudyId", "IndexDate"], how="left" ) # Fill N/As with 0 as no engagement was observed for those patients pro_engagement = pro_engagement.fillna(0) return pro_engagement def calc_last_pro_score(pro_df, pro_name): """ Calculates the most recent PRO response. The latest PRO score is set to be within 2 months of the index date to allow recency of data without having many missing values. Args: pro_df (pd.DataFrame): dataframe containing the index dates and PRO response submission dates. pro_name (str): name of the PRO. Returns: pd.DataFrame: the input dateframe with additional columns stating the latest PRO score for each PRO question. """ # Calculate last PRO score pro_df["TimeSinceSubmission"] = ( pro_df["IndexDate"] - pro_df["SubmissionTime"] ).dt.days pro_df = pro_df[pro_df["TimeSinceSubmission"] > 0] pro_df = pro_df.sort_values( by=["StudyId", "IndexDate", "TimeSinceSubmission"], ascending=True ) latest_pro = pro_df.drop_duplicates(subset=["StudyId", "IndexDate"], keep="first") # Ensure that the latest PRO Score is within 2 months of the index date latest_pro = latest_pro[latest_pro["TimeSinceSubmission"] <= 365] # Select specific columns question_cols = latest_pro.columns[ latest_pro.columns.str.startswith(pro_name) ].tolist() question_cols.extend( ["StudyId", "IndexDate", "Score", "SubmissionTime", "TimeSinceSubmission"] ) latest_pro = latest_pro[question_cols] # if pro_name == "EQ5D": # median_val_q1 = latest_pro["EQ5DScoreWithoutQ6"].median() # print(median_val_q1) # latest_pro = weigh_features_by_recency( # df=latest_pro, # feature="EQ5DScoreWithoutQ6", # feature_recency_days="TimeSinceSubmission", # median_value=median_val_q1, # decay_rate=0.001, # ) # print(latest_pro.columns) # # # Add prefix to question columns # cols_to_rename = latest_pro.columns[ # ~latest_pro.columns.isin( # ["StudyId", "IndexDate", "Score", "SubmissionTime"] # ) # ] # latest_pro = latest_pro.rename( # columns=dict(zip(cols_to_rename, "Latest" + cols_to_rename)) # ) # # # Rename columns where prefix not added # latest_pro = latest_pro.rename( # columns={ # "Score": "Latest" + pro_name + "Score", # "SubmissionTime": "LatestPRODate", # } # ) # # elif pro_name == "MRC": # median_val_q1 = latest_pro["Score"].median() # print(median_val_q1) # latest_pro = weigh_features_by_recency( # df=latest_pro, # feature="Score", # feature_recency_days="TimeSinceSubmission", # median_value=median_val_q1, # decay_rate=0.001, # ) # print(latest_pro.columns) # # Add prefix to question columns # cols_to_rename = latest_pro.columns[ # ~latest_pro.columns.isin( # ["StudyId", "IndexDate", "Score", "SubmissionTime", "ScoreWeighted"] # ) # ] # latest_pro = latest_pro.rename( # columns=dict(zip(cols_to_rename, "Latest" + cols_to_rename)) # ) # # Rename columns where prefix not added # latest_pro = latest_pro.rename( # columns={ # "Score": "Latest" + pro_name + "Score", # "ScoreWeighted": "Latest" + pro_name + "ScoreWeighted", # "SubmissionTime": "LatestPRODate", # } # ) # else: # Add prefix to question columns cols_to_rename = latest_pro.columns[ ~latest_pro.columns.isin(["StudyId", "IndexDate", "Score", "SubmissionTime"]) ] latest_pro = latest_pro.rename( columns=dict(zip(cols_to_rename, "Latest" + cols_to_rename)) ) # Rename columns where prefix not added latest_pro = latest_pro.rename( columns={ "Score": "Latest" + pro_name + "Score", "SubmissionTime": "LatestPRODate", } ) pro_df = pro_df.merge(latest_pro, on=["StudyId", "IndexDate"], how="left") return pro_df def calc_pro_score_prior_to_latest(pro_df, pro_name, time_prior_to_latest=60): """ Finds the PRO score prior to the latest PRO score before index date. Args: pro_df (pd.DataFrame): dataframe containing the latest PRO score and PRO response submission dates. pro_name (str): name of the PRO. time_prior_to_latest (int, optional): time period before latest PRO score in days. Default time frame set to 60 days (two months). Returns: pd.DataFrame: the input dateframe with additional columns stating the previous score closest to the latest PRO score for each PRO question. """ pro_previous = pro_df.copy() pro_previous = pro_previous[ pro_previous["SubmissionTime"] < pro_previous["LatestPRODate"] ] pro_previous = pro_previous.sort_values( by=["StudyId", "IndexDate", "SubmissionTime"], ascending=[True, True, False] ) pro_previous = pro_previous.drop_duplicates( subset=["StudyId", "IndexDate"], keep="first" ) # Make sure that previous score is within two months of the LatestPRODate pro_previous["TimeSinceLatestPRODate"] = ( pro_previous["LatestPRODate"] - pro_previous["SubmissionTime"] ).dt.days pro_previous = pro_previous[ pro_previous["TimeSinceLatestPRODate"] <= time_prior_to_latest ] # Add prefix to question columns cols_to_rename = [col for col in pro_previous if col.startswith(pro_name)] cols_to_rename = pro_previous[cols_to_rename].columns pro_previous = pro_previous.rename( columns=dict(zip(cols_to_rename, "Prev" + cols_to_rename)) ) pro_previous = pro_previous[["StudyId", "IndexDate", "Score"]].join( pro_previous.filter(regex="^Prev") ) pro_previous = pro_previous.rename(columns={"Score": "Prev" + pro_name + "Score"}) pro_df = pro_df.merge(pro_previous, on=["StudyId", "IndexDate"], how="left") return pro_df def define_mapping_for_calcs(pro_name, questions, prefixes): """ Defines the mapping for calculations between PRO responses. Args: pro_name (str): name of the PRO. questions (list): question names of PRO. prefixes (list): prefixes to identify which columns to use in calculations. The possible prefixes are: 'Avg', 'Prev', 'LongerAvg', 'WeekPrevAvg'. Returns: dict: mapping that maps columns for performing calculations. """ # Create empty dictionary to append questions mapping = defaultdict(list) # Iterate through questions and create mapping for calculations for question in questions: if (pro_name == "EQ5D") | (pro_name == "MRC"): map_key = "Latest" + pro_name + question if (pro_name == "CAT") | (pro_name == "SymptomDiary"): map_key = "WeekAvg" + pro_name + question for prefix in prefixes: mapping[map_key].append(prefix + pro_name + question) return mapping def calc_pro_average(pro_df, pro_name, time_window=None, avg_period=None): """ Calculate the PRO average before the latest PRO score and within a specified time window. Args: pro_df (pd.DataFrame): dataframe containing index dates and PRO submission dates. pro_name (str): name of the PRO. time_window (int, optional): time window (in months) used for calculating the average of PRO responses. Defaults to None. avg_period (str, optional): identifies which prefix to add to output columns. Defaults to None. Returns: pd.Dataframe: the input dateframe with additional columns with the calculated averages. """ # Calculate average in PRO responses for the time window specified prior to the # index date pro_df = pro_df.loc[ :, ~( pro_df.columns.str.startswith("Avg") | pro_df.columns.str.startswith("Longer") ), ] if avg_period is None: prefix = "Avg" pro_df["AvgStartDate"] = pro_df["IndexDate"] - pd.DateOffset(months=time_window) avg_pro = pro_df[ (pro_df["SubmissionTime"] >= pro_df["AvgStartDate"]) & (pro_df["SubmissionTime"] < pro_df["LatestPRODate"]) ] else: pro_df["WeekStartDate"] = pro_df["IndexDate"] - pd.DateOffset(weeks=1) pro_df["WeekPrevStartDate"] = pro_df["WeekStartDate"] - pd.DateOffset(weeks=1) # When looking at daily PROs, three averages are calculated: # The weekly average is the average of PRO scores in the week prior to IndexDate if avg_period == "WeeklyAvg": prefix = "WeekAvg" avg_pro = pro_df[ (pro_df["SubmissionTime"] >= pro_df["WeekStartDate"]) & (pro_df["SubmissionTime"] <= pro_df["IndexDate"]) ] # The weekly previous average is the average of PRO scores in the week prior to the # WeeklyAvg. This is needed to calculate the difference of scores between the most # recent week and the week before that elif avg_period == "WeekPrevAvg": prefix = "WeekPrevAvg" avg_pro = pro_df[ (pro_df["SubmissionTime"] >= pro_df["WeekPrevStartDate"]) & (pro_df["SubmissionTime"] < pro_df["WeekStartDate"]) ] # Longer average calculated is the time window specified prior to the WeekStartDate elif avg_period == "LongerAvg": prefix = "LongerAvg" pro_df["AvgStartDate"] = pro_df["IndexDate"] - pd.DateOffset(months=time_window) avg_pro = pro_df[ (pro_df["SubmissionTime"] >= pro_df["AvgStartDate"]) & (pro_df["SubmissionTime"] < pro_df["WeekStartDate"]) ] # Select specific columns cols_required = avg_pro.columns[avg_pro.columns.str.startswith(pro_name)].tolist() cols_required.extend(["StudyId", "IndexDate", "Score"]) avg_pro = avg_pro[cols_required] # Calculate average pro scores avg_pro = avg_pro.groupby(["StudyId", "IndexDate"]).mean().reset_index() # Add prefix to question columns cols_to_rename = avg_pro.columns[ ~avg_pro.columns.isin(["StudyId", "IndexDate", "Score"]) ] avg_pro = avg_pro.rename(columns=dict(zip(cols_to_rename, prefix + cols_to_rename))) # Rename columns where prefix not added avg_pro = avg_pro.rename(columns={"Score": prefix + pro_name + "Score"}) # Merge average PRO with rest of the df pro_df = pro_df.merge(avg_pro, on=["StudyId", "IndexDate"], how="left") return pro_df def calc_diff_pro_scores(pro_df, pro_name, latest_pro, other_pro, time_window=None): """ Calculate the difference between PRO scores. Args: pro_df (pd.DataFrame): dataframe containing columns required for calculations. pro_name (str): name of the PRO. latest_pro (str): column name containing the latest PRO response for PROs EQ5D and MRC, and the latest week average for PROs CAT and SymptomDiary. other_pro (str): column name containing the other variable for calculating difference. time_window (int, optional): time window (in months) used to specify which column to use when calculating differences. Returns: pd.Dataframe: the input dateframe with additional columns with the calculated differences. """ # Remove prefix of score split_feat_name = re.findall(r"[A-Z][^A-Z]*", latest_pro) # Remove first element of list to get the base name of feature split_feat_name.pop(0) # Remove the second element in list if PRO is CAT or SymptomDiary if pro_name in ["CAT", "SymptomDiary"]: split_feat_name.pop(0) # Combine remaining elements of list stripped_feat_name = "".join(split_feat_name) if time_window is None: pro_df["DiffLatestPrev" + stripped_feat_name] = ( pro_df[latest_pro] - pro_df[other_pro] ) else: pro_df["DiffLatestAvg" + stripped_feat_name + "TW" + str(time_window)] = ( pro_df[latest_pro] - pro_df[other_pro] ) return pro_df def calc_variation(pro_df, pro_name): """ Calculate the variation (standard deviation) of PRO responses for a time window of 1 month. Args: pro_df (pd.DataFrame): dataframe containing index dates and PRO submission dates. pro_name (str): name of the PRO. Returns: pd.Dataframe: the input dateframe with additional columns with the calculated variance. """ # Only calculate variation in the scores within 1 month before the IndexDate if "TimeSinceSubmission" not in pro_df: pro_df["TimeSinceSubmission"] = ( pro_df["IndexDate"] - pro_df["SubmissionTime"] ).dt.days pro_var = pro_df[ (pro_df["TimeSinceSubmission"] > 0) & (pro_df["TimeSinceSubmission"] <= 30) ] # Select specific columns cols_required = pro_var.columns[pro_var.columns.str.startswith(pro_name)].tolist() cols_required.extend(["StudyId", "IndexDate", "Score"]) pro_var = pro_var[cols_required] # Calculate variation pro_var = pro_var.groupby(["StudyId", "IndexDate"]).std().reset_index() # Add prefix to question columns cols_to_rename = pro_var.columns[ ~pro_var.columns.isin(["StudyId", "IndexDate", "Score"]) ] pro_var = pro_var.rename(columns=dict(zip(cols_to_rename, "Var" + cols_to_rename))) # Rename columns where prefix not added pro_var = pro_var.rename(columns={"Score": "Var" + pro_name + "Score"}) # Merge back to main df pro_df = pro_df.merge(pro_var, on=["StudyId", "IndexDate"], how="left") return pro_df def calc_sum_binary_vals(pro_df, binary_cols, time_window=1): """ For SymptomDiary questions that contain binary values, calculate the sum of the binary values for a specified time window. Args: pro_df (pd.DataFrame): dataframe containing index dates and PRO submission dates. binary_cols (list): column names for which sum of binary values is to be calculated for. time_window (int, optional): time window (in months) for which the sum of the binary values is calculated for. Defaults to 1. Returns: pd.Dataframe: a dataframe containing the sum of the binary values. """ # Make sure only entries before the index date and after the time window start date # are used pro_df["TimeWindowStartDate"] = pro_df["IndexDate"] - pd.DateOffset( months=time_window ) pro_df = pro_df[ (pro_df["SubmissionTime"] >= pro_df["TimeWindowStartDate"]) & (pro_df["SubmissionTime"] <= pro_df["IndexDate"]) ] sum_df = pro_df.groupby(["StudyId", "IndexDate"])[binary_cols].sum() # Rename columns sum_df = sum_df.add_prefix("Sum") sum_df = sum_df.add_suffix("TW" + str(time_window)) sum_df = sum_df.reset_index() return sum_df def scale_sum_to_response_rate(pro_df, sum, engagement_rate): """ Scale the sum calculated using copd.calc_sum_binary_vals() to the response rate to obtain a feature that is comparable between patients. Args: pro_df (pd.DataFrame): dataframe containing the columns for scaling the sum by the engagement rate. sum (str): column name that contains the data for the sum of the binary values. engagement_rate (str): column name that contains the data for the response rate. Returns: pd.Dataframe: the input dateframe with additional columns with the scaled sum. """ pro_df["Scaled" + sum] = pro_df[sum] / pro_df[engagement_rate] return pro_df with open("./training/config.yaml", "r") as config: config = yaml.safe_load(config) # Specify which model to generate features for model_type = config["model_settings"]["model_type"] # Setup log file log = open("./training/logging/process_pros_" + model_type + ".log", "w") sys.stdout = log # Dataset to process - set through config file data_to_process = config["model_settings"]["data_to_process"] # Load cohort data if data_to_process == "forward_val": data = pd.read_pickle("./data/patient_labels_forward_val_hosp_comm.pkl") patient_details = pd.read_pickle("./data/patient_details_forward_val.pkl") else: data = pd.read_pickle("./data/patient_labels_" + model_type + ".pkl") patient_details = pd.read_pickle("./data/patient_details.pkl") data = data[["StudyId", "IndexDate"]] patient_details = data.merge( patient_details[["StudyId", "FirstSubmissionDate", "LatestPredictionDate"]], on="StudyId", how="left", ) # Calculate the lookback start date. Will need this to aggreggate data for model # features data["LookbackStartDate"] = data["IndexDate"] - pd.DateOffset( days=config["model_settings"]["lookback_period"] ) ############################################ # Monthly PROs - EQ5D ############################################ pro_eq5d = pd.read_csv(config["inputs"]["raw_data_paths"]["pro_eq5d"], delimiter="|") pro_eq5d = pro_eq5d.merge( patient_details, on="StudyId", how="inner", ) # EQ5DQ6 is a much less structured question compared to the other questions in EQ5D. # A new score will be calculated using only EQ5DQ1-Q5 to prevent Q6 affecting the score pro_eq5d["EQ5DScoreWithoutQ6"] = pro_eq5d.loc[:, "EQ5DQ1":"EQ5DQ5"].sum(axis=1) # Calculate engagement over service pro_eq5d = calc_total_pro_engagement(pro_eq5d, "EQ5D") # Calculate engagement for a time window of 1 month (time window chosen based on signal # output observed from results of feature_eng_multiple_testing) pro_eq5d_engagement = calc_pro_engagement_in_time_window( pro_eq5d, "EQ5D", time_window=1, data=data ) pro_eq5d = pro_eq5d.merge(pro_eq5d_engagement, on=["StudyId", "IndexDate"], how="left") # Calculate last PRO score pro_eq5d = calc_last_pro_score(pro_eq5d, "EQ5D") # Mapping to calculate the difference between the latest PRO scores and the average # PRO score question_names_eq5d = ["Q1", "Q2", "Q3", "Q4", "Q5", "Q6", "Score", "ScoreWithoutQ6"] mapping_eq5d = define_mapping_for_calcs("EQ5D", question_names_eq5d, prefixes=["Avg"]) # Calculate average PRO score for a time window of 1 month prior to IndexDate, # ignoring the latest PRO score pro_eq5d = calc_pro_average(pro_eq5d, "EQ5D", time_window=1) for key in mapping_eq5d: calc_diff_pro_scores(pro_eq5d, "EQ5D", key, mapping_eq5d[key][0], time_window=1) # Calculate variation of scores across 1 month pro_eq5d = calc_variation(pro_eq5d, "EQ5D") # Remove unwanted columns and duplicates pro_eq5d = pro_eq5d.loc[ :, ~( pro_eq5d.columns.str.startswith("Avg") | pro_eq5d.columns.str.startswith("EQ5D") | pro_eq5d.columns.str.startswith("Response") ), ] pro_eq5d = pro_eq5d.drop( columns=[ "Score", "SubmissionTime", "FirstSubmissionDate", "TimeInService", "TimeSinceSubmission", "LatestPredictionDate", "LatestPRODate", ] ) pro_eq5d = pro_eq5d.drop_duplicates() ############################################ # Weekly PROs - MRC ############################################ pro_mrc = pd.read_csv(config["inputs"]["raw_data_paths"]["pro_mrc"], delimiter="|") pro_mrc = pro_mrc.merge( patient_details, on="StudyId", how="inner", ) # Calculate engagement over service pro_mrc = calc_total_pro_engagement(pro_mrc, "MRC") # Calculate engagement for a time window of 1 month pro_mrc_engagement = calc_pro_engagement_in_time_window( pro_mrc, "MRC", time_window=1, data=data ) pro_mrc = pro_mrc.merge(pro_mrc_engagement, on=["StudyId", "IndexDate"], how="left") # Calculate last PRO score pro_mrc = calc_last_pro_score(pro_mrc, "MRC") # Mapping to calculate the difference between the latest PRO scores and the average # PRO score question_names_mrc = ["Q1"] mapping_mrc = define_mapping_for_calcs("MRC", question_names_mrc, prefixes=["Avg"]) # Calculate average PRO score for a time window of 1 month prior to IndexDate, # ignoring the latest PRO score pro_mrc = calc_pro_average(pro_mrc, "MRC", time_window=1) for key in mapping_mrc: calc_diff_pro_scores(pro_mrc, "MRC", key, mapping_mrc[key][0], time_window=1) # Calculate variation of scores across 1 month pro_mrc = calc_variation(pro_mrc, "MRC") # Remove unwanted columns and duplicates pro_mrc = pro_mrc.loc[ :, ~( pro_mrc.columns.str.startswith("Avg") | pro_mrc.columns.str.startswith("MRC") | pro_mrc.columns.str.startswith("Response") ), ] pro_mrc = pro_mrc.drop( columns=[ "SubmissionTime", "Score", "FirstSubmissionDate", "TimeInService", "TimeSinceSubmission", "LatestPredictionDate", "LatestPRODate", ] ) pro_mrc = pro_mrc.drop_duplicates() ############################################ # Daily PROs - CAT ############################################ pro_cat_full = pd.read_csv(config["inputs"]["raw_data_paths"]["pro_cat"], delimiter="|") pro_cat = pro_cat_full.merge( patient_details, on="StudyId", how="inner", ) # Calculate engagement over service pro_cat = calc_total_pro_engagement(pro_cat, "CAT") # Calculate engagement for a time window of 1 month pro_cat_engagement = calc_pro_engagement_in_time_window( pro_cat, "CAT", time_window=1, data=data ) pro_cat = pro_cat.merge(pro_cat_engagement, on=["StudyId", "IndexDate"], how="left") # Calculate engagement in the month prior to the most recent month to index date pro_cat_month1 = calc_pro_engagement_at_specific_month( pro_cat, "CAT", month_num=1, data=data ) pro_cat_month2 = calc_pro_engagement_at_specific_month( pro_cat, "CAT", month_num=2, data=data ) pro_cat_month3 = calc_pro_engagement_at_specific_month( pro_cat, "CAT", month_num=3, data=data ) pro_cat = pro_cat.merge(pro_cat_month1, on=["StudyId", "IndexDate"], how="left") pro_cat = pro_cat.merge(pro_cat_month2, on=["StudyId", "IndexDate"], how="left") pro_cat = pro_cat.merge(pro_cat_month3, on=["StudyId", "IndexDate"], how="left") pro_cat["EngagementDiffMonth1and2"] = ( pro_cat["EngagementCATMonth1"] - pro_cat["EngagementCATMonth2"] ) pro_cat["EngagementDiffMonth1and3"] = ( pro_cat["EngagementCATMonth1"] - pro_cat["EngagementCATMonth3"] ) # Calculate PRO average for the week before the index date pro_cat = calc_pro_average(pro_cat, "CAT", avg_period="WeeklyAvg") # Calculate variation of scores across 1 month pro_cat = calc_variation(pro_cat, "CAT") # Remove unwanted columns and duplicates pro_cat = pro_cat.loc[ :, ~( pro_cat.columns.str.startswith("CAT") | pro_cat.columns.str.startswith("Response") ), ] pro_cat = pro_cat.drop( columns=[ "Score", "SubmissionTime", "FirstSubmissionDate", "TimeSinceSubmission", "LatestPredictionDate", "TimeInService", "WeekStartDate", "WeekPrevStartDate", ] ) pro_cat = pro_cat.drop_duplicates() ############################################ # Daily PROs - Symptom Diary ############################################ # Symptom diary have some questions that are numeric and some that are categorical pro_sd_full = pd.read_csv( config["inputs"]["raw_data_paths"]["pro_symptom_diary"], delimiter="|" ) pro_sd = pro_sd_full.merge( patient_details, on="StudyId", how="inner", ) # Calculate engagement over service pro_sd = calc_total_pro_engagement(pro_sd, "SymptomDiary") pro_sd_engagement = pro_sd[ ["StudyId", "PatientId", "IndexDate", "TotalEngagementSymptomDiary"] ] # Calculate engagement for 1 month prior to IndexDate pro_sd_engagement_tw = calc_pro_engagement_in_time_window( pro_sd, "SymptomDiary", time_window=1, data=data ) pro_sd_engagement = pro_sd_engagement.merge( pro_sd_engagement_tw, on=["StudyId", "IndexDate"], how="left" ) pro_sd_engagement = pro_sd_engagement.drop_duplicates() ############################### # Categorical questions # (Q8, Q9, Q10) ############################### pro_cat_q5 = pro_cat_full[["StudyId", "SubmissionTime", "CATQ5"]] pro_sd_categ = pro_sd_full[ [ "StudyId", "SubmissionTime", "SymptomDiaryQ8", "SymptomDiaryQ9", "SymptomDiaryQ10", "Score", ] ] # Split timestamp column into separate date and time columns as same day entries in CAT # and SymptomDiary have different timestamps for df in [pro_cat_q5, pro_sd_categ]: df["Date"] = (pd.to_datetime(df["SubmissionTime"], utc=True)).dt.date pro_sd_cat = pro_sd_categ.merge(pro_cat_q5, on=["StudyId", "Date"], how="outer") # If CATQ5 is a 0, then Symptom Diary questions 8, 9 and 10 don't get asked. Add this as # an option to the columns. There are some cases where patients have a 0 in CATQ5 but # have also answered Symptom Diary questions 8, 9, and 10 - keep these answers as is. for col in ["SymptomDiaryQ8", "SymptomDiaryQ9", "SymptomDiaryQ10"]: pro_sd_cat[col] = np.where( (pro_sd_cat["CATQ5"] == 0) & (pro_sd_cat[col].isna()), "Question Not Asked", pro_sd_cat[col], ) # Calculate the most recent score for SymptomDiary categorical questions pro_sd_cat = pro_sd_cat.merge(data[["StudyId", "IndexDate"]], on="StudyId", how="inner") pro_sd_cat = pro_sd_cat.rename(columns={"SubmissionTime_x": "SubmissionTime"}) pro_sd_cat["SubmissionTime"] = pd.to_datetime(pro_sd_cat["SubmissionTime"], utc=True) pro_sd_cat = calc_last_pro_score(pro_sd_cat, "SymptomDiary") pro_sd_cat = pro_sd_cat.drop( columns=[ "SubmissionTime", "SubmissionTime_y", "CATQ5", "SymptomDiaryQ8", "SymptomDiaryQ9", "Date", "SymptomDiaryQ10", "Score", "LatestSymptomDiaryScore", "LatestPRODate", "TimeSinceSubmission", ] ) pro_sd_cat = pro_sd_cat.drop_duplicates() ############################### # Numeric questions # (Q1, Q2) # Q3 included for comparison ############################### # Calculate PRO average for the week before the index date pro_sd_numeric = pro_sd[ [ "StudyId", "PatientId", "IndexDate", "SubmissionTime", "Score", "SymptomDiaryQ1", "SymptomDiaryQ2", "SymptomDiaryQ3", ] ] pro_sd_numeric = calc_pro_average( pro_sd_numeric, "SymptomDiary", avg_period="WeeklyAvg" ) # Calculate variation of scores across 1 month pro_sd_numeric = calc_variation(pro_sd_numeric, "SymptomDiary") ############################### # Binary questions # (Q3) ############################### # Calculate sum of binary values for a time window of 1 months sd_sum_all = pro_sd_numeric[["StudyId", "IndexDate"]] sd_sum_all = sd_sum_all.drop_duplicates() sd_sum = calc_sum_binary_vals( pro_sd_numeric, binary_cols=["SymptomDiaryQ3"], time_window=1 ) sd_sum_all = sd_sum_all.merge(sd_sum, on=["StudyId", "IndexDate"], how="left") # Scale sums by how often patients responded sd_sum_all = sd_sum_all.merge( pro_sd_engagement, on=["StudyId", "IndexDate"], how="left" ) mapping_scaling = {"SumSymptomDiaryQ3TW1": "EngagementSymptomDiaryTW1"} for key in mapping_scaling: scale_sum_to_response_rate(sd_sum_all, key, mapping_scaling[key]) # Combine numeric, categorical and binary dfs pro_sd_all = pro_sd_numeric.merge( sd_sum_all, on=["StudyId", "PatientId", "IndexDate"], how="left" ) pro_sd_all = pro_sd_all.merge(pro_sd_cat, on=["StudyId", "IndexDate"], how="left") # Remove unwanted columns from numeric df pro_sd_all = pro_sd_all.loc[ :, ~( pro_sd_all.columns.str.startswith("Symptom") | pro_sd_all.columns.str.startswith("Sum") | pro_sd_all.columns.str.startswith("Response") ), ] pro_sd_all = pro_sd_all.drop( columns=[ "Score", "SubmissionTime", "TimeWindowStartDate", "WeekStartDate", "WeekPrevStartDate", "TimeSinceSubmission", ] ) pro_sd_all = pro_sd_all.drop_duplicates() # Combine pros into one df pro_df = pro_eq5d.merge(pro_mrc, on=["StudyId", "PatientId", "IndexDate"], how="left") pro_df = pro_df.merge(pro_cat, on=["StudyId", "PatientId", "IndexDate"], how="left") pro_df = pro_df.merge(pro_sd_all, on=["StudyId", "PatientId", "IndexDate"], how="left") ############################### # Map some categorical features ############################### # Replace SDQ8 with strings for phlegm difficulty q8_dict = { "1.0": "Not difficult", "2.0": "A little difficult", "3.0": "Quite difficult", "4.0": "Very difficult", } for key in q8_dict: pro_df["LatestSymptomDiaryQ8"] = pro_df["LatestSymptomDiaryQ8"].str.replace( key, q8_dict[key] ) # Replace SDQ9 with strings for phlegm consistency q9_dict = { "1.0": "Watery", "2.0": "Sticky liquid", "3.0": "Semi-solid", "4.0": "Solid", } for key in q9_dict: pro_df["LatestSymptomDiaryQ9"] = pro_df["LatestSymptomDiaryQ9"].str.replace( key, q9_dict[key] ) # Replace SDQ10 with strings for phlegm colour q10_dict = { "1.0": "White", "2.0": "Yellow", "3.0": "Green", "4.0": "Dark green", } for key in q10_dict: pro_df["LatestSymptomDiaryQ10"] = pro_df["LatestSymptomDiaryQ10"].str.replace( key, q10_dict[key] ) pro_df = pro_df.drop( columns=[ "PatientId", "LatestTimeSinceSubmission", "LatestTimeSinceSubmission_x", "LatestTimeSinceSubmission_y", ] ) # Save data os.makedirs(config["outputs"]["processed_data_dir"], exist_ok=True) if data_to_process == "forward_val": pro_df.to_pickle( os.path.join( config["outputs"]["processed_data_dir"], "pros_forward_val_" + model_type + ".pkl", ) ) else: pro_df.to_pickle( os.path.join( config["outputs"]["processed_data_dir"], "pros_" + model_type + ".pkl", ) )