copd-model-h / training /process_pros.py
IamGrooooot's picture
Inital Upload
000de75
"""
Derive features PRO responses for 2 models:
Parallel model 1: uses both hospital and community exacerbation events
Parallel model 2: uses only hospital exacerbation events
"""
import numpy as np
import pandas as pd
import sys
import os
import re
from collections import defaultdict
import yaml
def calc_total_pro_engagement(pro_df, pro_name):
"""
Calculates PRO engagement per patient across their entire time within the service.
Args:
pro_df (pd.DataFrame): dataframe containing the onboarding date and the latest
prediction date.
pro_name (str): name of the PRO.
Returns:
pd.DataFrame: the input dateframe with an additional column stating the total
engagement for each patient across the service.
"""
# Calculate time in service according to type of PRO
if pro_name == "EQ5D":
date_unit = "M"
if pro_name == "MRC":
date_unit = "W"
if (pro_name == "CAT") | (pro_name == "SymptomDiary"):
date_unit = "D"
pro_df["TimeInService"] = np.floor(
(
(pro_df.LatestPredictionDate - pro_df.FirstSubmissionDate)
/ np.timedelta64(1, date_unit)
)
)
# PRO engagement for the total time in service
pro_response_count = pro_df.groupby("StudyId").count()[["PatientId"]].reset_index()
pro_response_count = pro_response_count.rename(
columns={"PatientId": "Response" + pro_name}
)
pro_df = pro_df.merge(pro_response_count, on="StudyId", how="left")
pro_df["TotalEngagement" + pro_name] = round(
pro_df["Response" + pro_name] / pro_df["TimeInService"], 2
)
return pro_df
def calc_pro_engagement_in_time_window(pro_df, pro_name, time_window, data):
"""
Calculates PRO engagement per patient across a specified time window. The time
window is in format 'months', and consists of the specified time period prior to
IndexDate.
Args:
pro_df (pd.DataFrame): dataframe containing the index dates and PRO response
submission dates.
pro_name (str): name of the PRO.
time_window (int): number of months in which to calculate PRO engagement.
data (pd.DataFrame): main dataframe.
Returns:
pd.DataFrame: a dataframe containing the calculated PRO engagement.
"""
# Calculate time in service according to type of PRO.
if pro_name == "EQ5D":
unit_val = 1
if pro_name == "MRC":
unit_val = 4
if (pro_name == "CAT") | (pro_name == "SymptomDiary"):
unit_val = 30
pro_df["SubmissionTime"] = pd.to_datetime(pro_df["SubmissionTime"], utc=True)
pro_engagement_6mo = pro_df.copy()
pro_engagement_6mo["TimeSinceSubmission"] = (
pro_engagement_6mo["IndexDate"] - pro_engagement_6mo["SubmissionTime"]
).dt.days
# Only include PRO responses within the specified time window
pro_engagement_6mo = pro_engagement_6mo[
pro_engagement_6mo["TimeSinceSubmission"].between(
0, (time_window * 30), inclusive="both"
)
]
# Calculate number of PRO responses within specified time window
pro_engagement_6mo = (
pro_engagement_6mo.groupby(["StudyId", "IndexDate"])
.count()[["PatientId"]]
.reset_index()
)
pro_engagement_6mo = pro_engagement_6mo.rename(
columns={"PatientId": "ResponseCountTW" + str(time_window)}
)
pro_engagement_6mo["Engagement" + pro_name + "TW" + str(time_window)] = round(
pro_engagement_6mo["ResponseCountTW" + str(time_window)]
/ (time_window * unit_val),
2,
)
pro_engagement_6mo = data[["StudyId", "IndexDate"]].merge(
pro_engagement_6mo, on=["StudyId", "IndexDate"], how="left"
)
# Fill N/As with 0 as no engagement was observed for those patients
pro_engagement_6mo = pro_engagement_6mo.fillna(0)
return pro_engagement_6mo
def calc_pro_engagement_at_specific_month(pro_df, pro_name, month_num, data):
# Calculate time in service according to type of PRO.
if pro_name == "EQ5D":
unit_val = 1
if pro_name == "MRC":
unit_val = 4
if (pro_name == "CAT") | (pro_name == "SymptomDiary"):
unit_val = 30
pro_df["SubmissionTime"] = pd.to_datetime(pro_df["SubmissionTime"], utc=True)
pro_engagement = pro_df.copy()
pro_engagement["TimeSinceSubmission"] = (
pro_engagement["IndexDate"] - pro_engagement["SubmissionTime"]
).dt.days
# Only include PRO responses for the month specified
# Calculate the number of months between index date and specified month
months_between_index_and_specified = month_num - 1
pro_engagement = pro_engagement[
pro_engagement["TimeSinceSubmission"].between(
(months_between_index_and_specified * 30),
(month_num * 30),
inclusive="both",
)
]
# Calculate number of PRO responses within specified time window
pro_engagement = (
pro_engagement.groupby(["StudyId", "IndexDate"])
.count()[["PatientId"]]
.reset_index()
)
pro_engagement = pro_engagement.rename(
columns={"PatientId": "ResponseCountMonth" + str(month_num)}
)
pro_engagement["Engagement" + pro_name + "Month" + str(month_num)] = round(
pro_engagement["ResponseCountMonth" + str(month_num)] / (1 * unit_val),
2,
)
pro_engagement = data[["StudyId", "IndexDate"]].merge(
pro_engagement, on=["StudyId", "IndexDate"], how="left"
)
# Fill N/As with 0 as no engagement was observed for those patients
pro_engagement = pro_engagement.fillna(0)
return pro_engagement
def calc_last_pro_score(pro_df, pro_name):
"""
Calculates the most recent PRO response. The latest PRO score is set to be within 2
months of the index date to allow recency of data without having many missing
values.
Args:
pro_df (pd.DataFrame): dataframe containing the index dates and PRO response
submission dates.
pro_name (str): name of the PRO.
Returns:
pd.DataFrame: the input dateframe with additional columns stating the latest PRO
score for each PRO question.
"""
# Calculate last PRO score
pro_df["TimeSinceSubmission"] = (
pro_df["IndexDate"] - pro_df["SubmissionTime"]
).dt.days
pro_df = pro_df[pro_df["TimeSinceSubmission"] > 0]
pro_df = pro_df.sort_values(
by=["StudyId", "IndexDate", "TimeSinceSubmission"], ascending=True
)
latest_pro = pro_df.drop_duplicates(subset=["StudyId", "IndexDate"], keep="first")
# Ensure that the latest PRO Score is within 2 months of the index date
latest_pro = latest_pro[latest_pro["TimeSinceSubmission"] <= 365]
# Select specific columns
question_cols = latest_pro.columns[
latest_pro.columns.str.startswith(pro_name)
].tolist()
question_cols.extend(
["StudyId", "IndexDate", "Score", "SubmissionTime", "TimeSinceSubmission"]
)
latest_pro = latest_pro[question_cols]
# if pro_name == "EQ5D":
# median_val_q1 = latest_pro["EQ5DScoreWithoutQ6"].median()
# print(median_val_q1)
# latest_pro = weigh_features_by_recency(
# df=latest_pro,
# feature="EQ5DScoreWithoutQ6",
# feature_recency_days="TimeSinceSubmission",
# median_value=median_val_q1,
# decay_rate=0.001,
# )
# print(latest_pro.columns)
#
# # Add prefix to question columns
# cols_to_rename = latest_pro.columns[
# ~latest_pro.columns.isin(
# ["StudyId", "IndexDate", "Score", "SubmissionTime"]
# )
# ]
# latest_pro = latest_pro.rename(
# columns=dict(zip(cols_to_rename, "Latest" + cols_to_rename))
# )
#
# # Rename columns where prefix not added
# latest_pro = latest_pro.rename(
# columns={
# "Score": "Latest" + pro_name + "Score",
# "SubmissionTime": "LatestPRODate",
# }
# )
#
# elif pro_name == "MRC":
# median_val_q1 = latest_pro["Score"].median()
# print(median_val_q1)
# latest_pro = weigh_features_by_recency(
# df=latest_pro,
# feature="Score",
# feature_recency_days="TimeSinceSubmission",
# median_value=median_val_q1,
# decay_rate=0.001,
# )
# print(latest_pro.columns)
# # Add prefix to question columns
# cols_to_rename = latest_pro.columns[
# ~latest_pro.columns.isin(
# ["StudyId", "IndexDate", "Score", "SubmissionTime", "ScoreWeighted"]
# )
# ]
# latest_pro = latest_pro.rename(
# columns=dict(zip(cols_to_rename, "Latest" + cols_to_rename))
# )
# # Rename columns where prefix not added
# latest_pro = latest_pro.rename(
# columns={
# "Score": "Latest" + pro_name + "Score",
# "ScoreWeighted": "Latest" + pro_name + "ScoreWeighted",
# "SubmissionTime": "LatestPRODate",
# }
# )
# else:
# Add prefix to question columns
cols_to_rename = latest_pro.columns[
~latest_pro.columns.isin(["StudyId", "IndexDate", "Score", "SubmissionTime"])
]
latest_pro = latest_pro.rename(
columns=dict(zip(cols_to_rename, "Latest" + cols_to_rename))
)
# Rename columns where prefix not added
latest_pro = latest_pro.rename(
columns={
"Score": "Latest" + pro_name + "Score",
"SubmissionTime": "LatestPRODate",
}
)
pro_df = pro_df.merge(latest_pro, on=["StudyId", "IndexDate"], how="left")
return pro_df
def calc_pro_score_prior_to_latest(pro_df, pro_name, time_prior_to_latest=60):
"""
Finds the PRO score prior to the latest PRO score before index date.
Args:
pro_df (pd.DataFrame): dataframe containing the latest PRO score and PRO
response submission dates.
pro_name (str): name of the PRO.
time_prior_to_latest (int, optional): time period before latest PRO score in
days. Default time frame set to 60 days (two months).
Returns:
pd.DataFrame: the input dateframe with additional columns stating the previous
score closest to the latest PRO score for each PRO question.
"""
pro_previous = pro_df.copy()
pro_previous = pro_previous[
pro_previous["SubmissionTime"] < pro_previous["LatestPRODate"]
]
pro_previous = pro_previous.sort_values(
by=["StudyId", "IndexDate", "SubmissionTime"], ascending=[True, True, False]
)
pro_previous = pro_previous.drop_duplicates(
subset=["StudyId", "IndexDate"], keep="first"
)
# Make sure that previous score is within two months of the LatestPRODate
pro_previous["TimeSinceLatestPRODate"] = (
pro_previous["LatestPRODate"] - pro_previous["SubmissionTime"]
).dt.days
pro_previous = pro_previous[
pro_previous["TimeSinceLatestPRODate"] <= time_prior_to_latest
]
# Add prefix to question columns
cols_to_rename = [col for col in pro_previous if col.startswith(pro_name)]
cols_to_rename = pro_previous[cols_to_rename].columns
pro_previous = pro_previous.rename(
columns=dict(zip(cols_to_rename, "Prev" + cols_to_rename))
)
pro_previous = pro_previous[["StudyId", "IndexDate", "Score"]].join(
pro_previous.filter(regex="^Prev")
)
pro_previous = pro_previous.rename(columns={"Score": "Prev" + pro_name + "Score"})
pro_df = pro_df.merge(pro_previous, on=["StudyId", "IndexDate"], how="left")
return pro_df
def define_mapping_for_calcs(pro_name, questions, prefixes):
"""
Defines the mapping for calculations between PRO responses.
Args:
pro_name (str): name of the PRO.
questions (list): question names of PRO.
prefixes (list): prefixes to identify which columns to use in calculations. The
possible prefixes are: 'Avg', 'Prev', 'LongerAvg', 'WeekPrevAvg'.
Returns:
dict: mapping that maps columns for performing calculations.
"""
# Create empty dictionary to append questions
mapping = defaultdict(list)
# Iterate through questions and create mapping for calculations
for question in questions:
if (pro_name == "EQ5D") | (pro_name == "MRC"):
map_key = "Latest" + pro_name + question
if (pro_name == "CAT") | (pro_name == "SymptomDiary"):
map_key = "WeekAvg" + pro_name + question
for prefix in prefixes:
mapping[map_key].append(prefix + pro_name + question)
return mapping
def calc_pro_average(pro_df, pro_name, time_window=None, avg_period=None):
"""
Calculate the PRO average before the latest PRO score and within a specified time
window.
Args:
pro_df (pd.DataFrame): dataframe containing index dates and PRO submission
dates.
pro_name (str): name of the PRO.
time_window (int, optional): time window (in months) used for calculating the
average of PRO responses. Defaults to None.
avg_period (str, optional): identifies which prefix to add to output columns.
Defaults to None.
Returns:
pd.Dataframe: the input dateframe with additional columns with the calculated
averages.
"""
# Calculate average in PRO responses for the time window specified prior to the
# index date
pro_df = pro_df.loc[
:,
~(
pro_df.columns.str.startswith("Avg")
| pro_df.columns.str.startswith("Longer")
),
]
if avg_period is None:
prefix = "Avg"
pro_df["AvgStartDate"] = pro_df["IndexDate"] - pd.DateOffset(months=time_window)
avg_pro = pro_df[
(pro_df["SubmissionTime"] >= pro_df["AvgStartDate"])
& (pro_df["SubmissionTime"] < pro_df["LatestPRODate"])
]
else:
pro_df["WeekStartDate"] = pro_df["IndexDate"] - pd.DateOffset(weeks=1)
pro_df["WeekPrevStartDate"] = pro_df["WeekStartDate"] - pd.DateOffset(weeks=1)
# When looking at daily PROs, three averages are calculated:
# The weekly average is the average of PRO scores in the week prior to IndexDate
if avg_period == "WeeklyAvg":
prefix = "WeekAvg"
avg_pro = pro_df[
(pro_df["SubmissionTime"] >= pro_df["WeekStartDate"])
& (pro_df["SubmissionTime"] <= pro_df["IndexDate"])
]
# The weekly previous average is the average of PRO scores in the week prior to the
# WeeklyAvg. This is needed to calculate the difference of scores between the most
# recent week and the week before that
elif avg_period == "WeekPrevAvg":
prefix = "WeekPrevAvg"
avg_pro = pro_df[
(pro_df["SubmissionTime"] >= pro_df["WeekPrevStartDate"])
& (pro_df["SubmissionTime"] < pro_df["WeekStartDate"])
]
# Longer average calculated is the time window specified prior to the WeekStartDate
elif avg_period == "LongerAvg":
prefix = "LongerAvg"
pro_df["AvgStartDate"] = pro_df["IndexDate"] - pd.DateOffset(months=time_window)
avg_pro = pro_df[
(pro_df["SubmissionTime"] >= pro_df["AvgStartDate"])
& (pro_df["SubmissionTime"] < pro_df["WeekStartDate"])
]
# Select specific columns
cols_required = avg_pro.columns[avg_pro.columns.str.startswith(pro_name)].tolist()
cols_required.extend(["StudyId", "IndexDate", "Score"])
avg_pro = avg_pro[cols_required]
# Calculate average pro scores
avg_pro = avg_pro.groupby(["StudyId", "IndexDate"]).mean().reset_index()
# Add prefix to question columns
cols_to_rename = avg_pro.columns[
~avg_pro.columns.isin(["StudyId", "IndexDate", "Score"])
]
avg_pro = avg_pro.rename(columns=dict(zip(cols_to_rename, prefix + cols_to_rename)))
# Rename columns where prefix not added
avg_pro = avg_pro.rename(columns={"Score": prefix + pro_name + "Score"})
# Merge average PRO with rest of the df
pro_df = pro_df.merge(avg_pro, on=["StudyId", "IndexDate"], how="left")
return pro_df
def calc_diff_pro_scores(pro_df, pro_name, latest_pro, other_pro, time_window=None):
"""
Calculate the difference between PRO scores.
Args:
pro_df (pd.DataFrame): dataframe containing columns required for calculations.
pro_name (str): name of the PRO.
latest_pro (str): column name containing the latest PRO response for PROs EQ5D
and MRC, and the latest week average for PROs CAT and SymptomDiary.
other_pro (str): column name containing the other variable for calculating
difference.
time_window (int, optional): time window (in months) used to specify which
column to use when calculating differences.
Returns:
pd.Dataframe: the input dateframe with additional columns with the calculated
differences.
"""
# Remove prefix of score
split_feat_name = re.findall(r"[A-Z][^A-Z]*", latest_pro)
# Remove first element of list to get the base name of feature
split_feat_name.pop(0)
# Remove the second element in list if PRO is CAT or SymptomDiary
if pro_name in ["CAT", "SymptomDiary"]:
split_feat_name.pop(0)
# Combine remaining elements of list
stripped_feat_name = "".join(split_feat_name)
if time_window is None:
pro_df["DiffLatestPrev" + stripped_feat_name] = (
pro_df[latest_pro] - pro_df[other_pro]
)
else:
pro_df["DiffLatestAvg" + stripped_feat_name + "TW" + str(time_window)] = (
pro_df[latest_pro] - pro_df[other_pro]
)
return pro_df
def calc_variation(pro_df, pro_name):
"""
Calculate the variation (standard deviation) of PRO responses for a time window of
1 month.
Args:
pro_df (pd.DataFrame): dataframe containing index dates and PRO submission
dates.
pro_name (str): name of the PRO.
Returns:
pd.Dataframe: the input dateframe with additional columns with the calculated
variance.
"""
# Only calculate variation in the scores within 1 month before the IndexDate
if "TimeSinceSubmission" not in pro_df:
pro_df["TimeSinceSubmission"] = (
pro_df["IndexDate"] - pro_df["SubmissionTime"]
).dt.days
pro_var = pro_df[
(pro_df["TimeSinceSubmission"] > 0) & (pro_df["TimeSinceSubmission"] <= 30)
]
# Select specific columns
cols_required = pro_var.columns[pro_var.columns.str.startswith(pro_name)].tolist()
cols_required.extend(["StudyId", "IndexDate", "Score"])
pro_var = pro_var[cols_required]
# Calculate variation
pro_var = pro_var.groupby(["StudyId", "IndexDate"]).std().reset_index()
# Add prefix to question columns
cols_to_rename = pro_var.columns[
~pro_var.columns.isin(["StudyId", "IndexDate", "Score"])
]
pro_var = pro_var.rename(columns=dict(zip(cols_to_rename, "Var" + cols_to_rename)))
# Rename columns where prefix not added
pro_var = pro_var.rename(columns={"Score": "Var" + pro_name + "Score"})
# Merge back to main df
pro_df = pro_df.merge(pro_var, on=["StudyId", "IndexDate"], how="left")
return pro_df
def calc_sum_binary_vals(pro_df, binary_cols, time_window=1):
"""
For SymptomDiary questions that contain binary values, calculate the sum of the
binary values for a specified time window.
Args:
pro_df (pd.DataFrame): dataframe containing index dates and PRO submission
dates.
binary_cols (list): column names for which sum of binary values is to be
calculated for.
time_window (int, optional): time window (in months) for which the sum of the
binary values is calculated for. Defaults to 1.
Returns:
pd.Dataframe: a dataframe containing the sum of the binary values.
"""
# Make sure only entries before the index date and after the time window start date
# are used
pro_df["TimeWindowStartDate"] = pro_df["IndexDate"] - pd.DateOffset(
months=time_window
)
pro_df = pro_df[
(pro_df["SubmissionTime"] >= pro_df["TimeWindowStartDate"])
& (pro_df["SubmissionTime"] <= pro_df["IndexDate"])
]
sum_df = pro_df.groupby(["StudyId", "IndexDate"])[binary_cols].sum()
# Rename columns
sum_df = sum_df.add_prefix("Sum")
sum_df = sum_df.add_suffix("TW" + str(time_window))
sum_df = sum_df.reset_index()
return sum_df
def scale_sum_to_response_rate(pro_df, sum, engagement_rate):
"""
Scale the sum calculated using copd.calc_sum_binary_vals() to the response
rate to obtain a feature that is comparable between patients.
Args:
pro_df (pd.DataFrame): dataframe containing the columns for scaling the sum by
the engagement rate.
sum (str): column name that contains the data for the sum of the binary values.
engagement_rate (str): column name that contains the data for the response rate.
Returns:
pd.Dataframe: the input dateframe with additional columns with the scaled sum.
"""
pro_df["Scaled" + sum] = pro_df[sum] / pro_df[engagement_rate]
return pro_df
with open("./training/config.yaml", "r") as config:
config = yaml.safe_load(config)
# Specify which model to generate features for
model_type = config["model_settings"]["model_type"]
# Setup log file
log = open("./training/logging/process_pros_" + model_type + ".log", "w")
sys.stdout = log
# Dataset to process - set through config file
data_to_process = config["model_settings"]["data_to_process"]
# Load cohort data
if data_to_process == "forward_val":
data = pd.read_pickle("./data/patient_labels_forward_val_hosp_comm.pkl")
patient_details = pd.read_pickle("./data/patient_details_forward_val.pkl")
else:
data = pd.read_pickle("./data/patient_labels_" + model_type + ".pkl")
patient_details = pd.read_pickle("./data/patient_details.pkl")
data = data[["StudyId", "IndexDate"]]
patient_details = data.merge(
patient_details[["StudyId", "FirstSubmissionDate", "LatestPredictionDate"]],
on="StudyId",
how="left",
)
# Calculate the lookback start date. Will need this to aggreggate data for model
# features
data["LookbackStartDate"] = data["IndexDate"] - pd.DateOffset(
days=config["model_settings"]["lookback_period"]
)
############################################
# Monthly PROs - EQ5D
############################################
pro_eq5d = pd.read_csv(config["inputs"]["raw_data_paths"]["pro_eq5d"], delimiter="|")
pro_eq5d = pro_eq5d.merge(
patient_details,
on="StudyId",
how="inner",
)
# EQ5DQ6 is a much less structured question compared to the other questions in EQ5D.
# A new score will be calculated using only EQ5DQ1-Q5 to prevent Q6 affecting the score
pro_eq5d["EQ5DScoreWithoutQ6"] = pro_eq5d.loc[:, "EQ5DQ1":"EQ5DQ5"].sum(axis=1)
# Calculate engagement over service
pro_eq5d = calc_total_pro_engagement(pro_eq5d, "EQ5D")
# Calculate engagement for a time window of 1 month (time window chosen based on signal
# output observed from results of feature_eng_multiple_testing)
pro_eq5d_engagement = calc_pro_engagement_in_time_window(
pro_eq5d, "EQ5D", time_window=1, data=data
)
pro_eq5d = pro_eq5d.merge(pro_eq5d_engagement, on=["StudyId", "IndexDate"], how="left")
# Calculate last PRO score
pro_eq5d = calc_last_pro_score(pro_eq5d, "EQ5D")
# Mapping to calculate the difference between the latest PRO scores and the average
# PRO score
question_names_eq5d = ["Q1", "Q2", "Q3", "Q4", "Q5", "Q6", "Score", "ScoreWithoutQ6"]
mapping_eq5d = define_mapping_for_calcs("EQ5D", question_names_eq5d, prefixes=["Avg"])
# Calculate average PRO score for a time window of 1 month prior to IndexDate,
# ignoring the latest PRO score
pro_eq5d = calc_pro_average(pro_eq5d, "EQ5D", time_window=1)
for key in mapping_eq5d:
calc_diff_pro_scores(pro_eq5d, "EQ5D", key, mapping_eq5d[key][0], time_window=1)
# Calculate variation of scores across 1 month
pro_eq5d = calc_variation(pro_eq5d, "EQ5D")
# Remove unwanted columns and duplicates
pro_eq5d = pro_eq5d.loc[
:,
~(
pro_eq5d.columns.str.startswith("Avg")
| pro_eq5d.columns.str.startswith("EQ5D")
| pro_eq5d.columns.str.startswith("Response")
),
]
pro_eq5d = pro_eq5d.drop(
columns=[
"Score",
"SubmissionTime",
"FirstSubmissionDate",
"TimeInService",
"TimeSinceSubmission",
"LatestPredictionDate",
"LatestPRODate",
]
)
pro_eq5d = pro_eq5d.drop_duplicates()
############################################
# Weekly PROs - MRC
############################################
pro_mrc = pd.read_csv(config["inputs"]["raw_data_paths"]["pro_mrc"], delimiter="|")
pro_mrc = pro_mrc.merge(
patient_details,
on="StudyId",
how="inner",
)
# Calculate engagement over service
pro_mrc = calc_total_pro_engagement(pro_mrc, "MRC")
# Calculate engagement for a time window of 1 month
pro_mrc_engagement = calc_pro_engagement_in_time_window(
pro_mrc, "MRC", time_window=1, data=data
)
pro_mrc = pro_mrc.merge(pro_mrc_engagement, on=["StudyId", "IndexDate"], how="left")
# Calculate last PRO score
pro_mrc = calc_last_pro_score(pro_mrc, "MRC")
# Mapping to calculate the difference between the latest PRO scores and the average
# PRO score
question_names_mrc = ["Q1"]
mapping_mrc = define_mapping_for_calcs("MRC", question_names_mrc, prefixes=["Avg"])
# Calculate average PRO score for a time window of 1 month prior to IndexDate,
# ignoring the latest PRO score
pro_mrc = calc_pro_average(pro_mrc, "MRC", time_window=1)
for key in mapping_mrc:
calc_diff_pro_scores(pro_mrc, "MRC", key, mapping_mrc[key][0], time_window=1)
# Calculate variation of scores across 1 month
pro_mrc = calc_variation(pro_mrc, "MRC")
# Remove unwanted columns and duplicates
pro_mrc = pro_mrc.loc[
:,
~(
pro_mrc.columns.str.startswith("Avg")
| pro_mrc.columns.str.startswith("MRC")
| pro_mrc.columns.str.startswith("Response")
),
]
pro_mrc = pro_mrc.drop(
columns=[
"SubmissionTime",
"Score",
"FirstSubmissionDate",
"TimeInService",
"TimeSinceSubmission",
"LatestPredictionDate",
"LatestPRODate",
]
)
pro_mrc = pro_mrc.drop_duplicates()
############################################
# Daily PROs - CAT
############################################
pro_cat_full = pd.read_csv(config["inputs"]["raw_data_paths"]["pro_cat"], delimiter="|")
pro_cat = pro_cat_full.merge(
patient_details,
on="StudyId",
how="inner",
)
# Calculate engagement over service
pro_cat = calc_total_pro_engagement(pro_cat, "CAT")
# Calculate engagement for a time window of 1 month
pro_cat_engagement = calc_pro_engagement_in_time_window(
pro_cat, "CAT", time_window=1, data=data
)
pro_cat = pro_cat.merge(pro_cat_engagement, on=["StudyId", "IndexDate"], how="left")
# Calculate engagement in the month prior to the most recent month to index date
pro_cat_month1 = calc_pro_engagement_at_specific_month(
pro_cat, "CAT", month_num=1, data=data
)
pro_cat_month2 = calc_pro_engagement_at_specific_month(
pro_cat, "CAT", month_num=2, data=data
)
pro_cat_month3 = calc_pro_engagement_at_specific_month(
pro_cat, "CAT", month_num=3, data=data
)
pro_cat = pro_cat.merge(pro_cat_month1, on=["StudyId", "IndexDate"], how="left")
pro_cat = pro_cat.merge(pro_cat_month2, on=["StudyId", "IndexDate"], how="left")
pro_cat = pro_cat.merge(pro_cat_month3, on=["StudyId", "IndexDate"], how="left")
pro_cat["EngagementDiffMonth1and2"] = (
pro_cat["EngagementCATMonth1"] - pro_cat["EngagementCATMonth2"]
)
pro_cat["EngagementDiffMonth1and3"] = (
pro_cat["EngagementCATMonth1"] - pro_cat["EngagementCATMonth3"]
)
# Calculate PRO average for the week before the index date
pro_cat = calc_pro_average(pro_cat, "CAT", avg_period="WeeklyAvg")
# Calculate variation of scores across 1 month
pro_cat = calc_variation(pro_cat, "CAT")
# Remove unwanted columns and duplicates
pro_cat = pro_cat.loc[
:,
~(
pro_cat.columns.str.startswith("CAT")
| pro_cat.columns.str.startswith("Response")
),
]
pro_cat = pro_cat.drop(
columns=[
"Score",
"SubmissionTime",
"FirstSubmissionDate",
"TimeSinceSubmission",
"LatestPredictionDate",
"TimeInService",
"WeekStartDate",
"WeekPrevStartDate",
]
)
pro_cat = pro_cat.drop_duplicates()
############################################
# Daily PROs - Symptom Diary
############################################
# Symptom diary have some questions that are numeric and some that are categorical
pro_sd_full = pd.read_csv(
config["inputs"]["raw_data_paths"]["pro_symptom_diary"], delimiter="|"
)
pro_sd = pro_sd_full.merge(
patient_details,
on="StudyId",
how="inner",
)
# Calculate engagement over service
pro_sd = calc_total_pro_engagement(pro_sd, "SymptomDiary")
pro_sd_engagement = pro_sd[
["StudyId", "PatientId", "IndexDate", "TotalEngagementSymptomDiary"]
]
# Calculate engagement for 1 month prior to IndexDate
pro_sd_engagement_tw = calc_pro_engagement_in_time_window(
pro_sd, "SymptomDiary", time_window=1, data=data
)
pro_sd_engagement = pro_sd_engagement.merge(
pro_sd_engagement_tw, on=["StudyId", "IndexDate"], how="left"
)
pro_sd_engagement = pro_sd_engagement.drop_duplicates()
###############################
# Categorical questions
# (Q8, Q9, Q10)
###############################
pro_cat_q5 = pro_cat_full[["StudyId", "SubmissionTime", "CATQ5"]]
pro_sd_categ = pro_sd_full[
[
"StudyId",
"SubmissionTime",
"SymptomDiaryQ8",
"SymptomDiaryQ9",
"SymptomDiaryQ10",
"Score",
]
]
# Split timestamp column into separate date and time columns as same day entries in CAT
# and SymptomDiary have different timestamps
for df in [pro_cat_q5, pro_sd_categ]:
df["Date"] = (pd.to_datetime(df["SubmissionTime"], utc=True)).dt.date
pro_sd_cat = pro_sd_categ.merge(pro_cat_q5, on=["StudyId", "Date"], how="outer")
# If CATQ5 is a 0, then Symptom Diary questions 8, 9 and 10 don't get asked. Add this as
# an option to the columns. There are some cases where patients have a 0 in CATQ5 but
# have also answered Symptom Diary questions 8, 9, and 10 - keep these answers as is.
for col in ["SymptomDiaryQ8", "SymptomDiaryQ9", "SymptomDiaryQ10"]:
pro_sd_cat[col] = np.where(
(pro_sd_cat["CATQ5"] == 0) & (pro_sd_cat[col].isna()),
"Question Not Asked",
pro_sd_cat[col],
)
# Calculate the most recent score for SymptomDiary categorical questions
pro_sd_cat = pro_sd_cat.merge(data[["StudyId", "IndexDate"]], on="StudyId", how="inner")
pro_sd_cat = pro_sd_cat.rename(columns={"SubmissionTime_x": "SubmissionTime"})
pro_sd_cat["SubmissionTime"] = pd.to_datetime(pro_sd_cat["SubmissionTime"], utc=True)
pro_sd_cat = calc_last_pro_score(pro_sd_cat, "SymptomDiary")
pro_sd_cat = pro_sd_cat.drop(
columns=[
"SubmissionTime",
"SubmissionTime_y",
"CATQ5",
"SymptomDiaryQ8",
"SymptomDiaryQ9",
"Date",
"SymptomDiaryQ10",
"Score",
"LatestSymptomDiaryScore",
"LatestPRODate",
"TimeSinceSubmission",
]
)
pro_sd_cat = pro_sd_cat.drop_duplicates()
###############################
# Numeric questions
# (Q1, Q2)
# Q3 included for comparison
###############################
# Calculate PRO average for the week before the index date
pro_sd_numeric = pro_sd[
[
"StudyId",
"PatientId",
"IndexDate",
"SubmissionTime",
"Score",
"SymptomDiaryQ1",
"SymptomDiaryQ2",
"SymptomDiaryQ3",
]
]
pro_sd_numeric = calc_pro_average(
pro_sd_numeric, "SymptomDiary", avg_period="WeeklyAvg"
)
# Calculate variation of scores across 1 month
pro_sd_numeric = calc_variation(pro_sd_numeric, "SymptomDiary")
###############################
# Binary questions
# (Q3)
###############################
# Calculate sum of binary values for a time window of 1 months
sd_sum_all = pro_sd_numeric[["StudyId", "IndexDate"]]
sd_sum_all = sd_sum_all.drop_duplicates()
sd_sum = calc_sum_binary_vals(
pro_sd_numeric, binary_cols=["SymptomDiaryQ3"], time_window=1
)
sd_sum_all = sd_sum_all.merge(sd_sum, on=["StudyId", "IndexDate"], how="left")
# Scale sums by how often patients responded
sd_sum_all = sd_sum_all.merge(
pro_sd_engagement, on=["StudyId", "IndexDate"], how="left"
)
mapping_scaling = {"SumSymptomDiaryQ3TW1": "EngagementSymptomDiaryTW1"}
for key in mapping_scaling:
scale_sum_to_response_rate(sd_sum_all, key, mapping_scaling[key])
# Combine numeric, categorical and binary dfs
pro_sd_all = pro_sd_numeric.merge(
sd_sum_all, on=["StudyId", "PatientId", "IndexDate"], how="left"
)
pro_sd_all = pro_sd_all.merge(pro_sd_cat, on=["StudyId", "IndexDate"], how="left")
# Remove unwanted columns from numeric df
pro_sd_all = pro_sd_all.loc[
:,
~(
pro_sd_all.columns.str.startswith("Symptom")
| pro_sd_all.columns.str.startswith("Sum")
| pro_sd_all.columns.str.startswith("Response")
),
]
pro_sd_all = pro_sd_all.drop(
columns=[
"Score",
"SubmissionTime",
"TimeWindowStartDate",
"WeekStartDate",
"WeekPrevStartDate",
"TimeSinceSubmission",
]
)
pro_sd_all = pro_sd_all.drop_duplicates()
# Combine pros into one df
pro_df = pro_eq5d.merge(pro_mrc, on=["StudyId", "PatientId", "IndexDate"], how="left")
pro_df = pro_df.merge(pro_cat, on=["StudyId", "PatientId", "IndexDate"], how="left")
pro_df = pro_df.merge(pro_sd_all, on=["StudyId", "PatientId", "IndexDate"], how="left")
###############################
# Map some categorical features
###############################
# Replace SDQ8 with strings for phlegm difficulty
q8_dict = {
"1.0": "Not difficult",
"2.0": "A little difficult",
"3.0": "Quite difficult",
"4.0": "Very difficult",
}
for key in q8_dict:
pro_df["LatestSymptomDiaryQ8"] = pro_df["LatestSymptomDiaryQ8"].str.replace(
key, q8_dict[key]
)
# Replace SDQ9 with strings for phlegm consistency
q9_dict = {
"1.0": "Watery",
"2.0": "Sticky liquid",
"3.0": "Semi-solid",
"4.0": "Solid",
}
for key in q9_dict:
pro_df["LatestSymptomDiaryQ9"] = pro_df["LatestSymptomDiaryQ9"].str.replace(
key, q9_dict[key]
)
# Replace SDQ10 with strings for phlegm colour
q10_dict = {
"1.0": "White",
"2.0": "Yellow",
"3.0": "Green",
"4.0": "Dark green",
}
for key in q10_dict:
pro_df["LatestSymptomDiaryQ10"] = pro_df["LatestSymptomDiaryQ10"].str.replace(
key, q10_dict[key]
)
pro_df = pro_df.drop(
columns=[
"PatientId",
"LatestTimeSinceSubmission",
"LatestTimeSinceSubmission_x",
"LatestTimeSinceSubmission_y",
]
)
# Save data
os.makedirs(config["outputs"]["processed_data_dir"], exist_ok=True)
if data_to_process == "forward_val":
pro_df.to_pickle(
os.path.join(
config["outputs"]["processed_data_dir"],
"pros_forward_val_" + model_type + ".pkl",
)
)
else:
pro_df.to_pickle(
os.path.join(
config["outputs"]["processed_data_dir"],
"pros_" + model_type + ".pkl",
)
)