copd-model-h / training /setup_labels_forward_val.py
IamGrooooot's picture
Inital Upload
000de75
"""
Script uses both hospital and community exacerbation events.
Collate all hospital, patient reported events and apply PRO LOGIC to determine the number
of exacerbation events. Use exacerbation events to determine the number of rows required per
patient in the data and generate random index dates and setup labels. Data starts at July
2022 and runs until Dec 2023 and will be used for forward validation of the model.
"""
import model_h
import numpy as np
import os
import sys
import pandas as pd
import matplotlib.pyplot as plt
from datetime import timedelta
import random
import yaml
with open("./training/config.yaml", "r") as config:
config = yaml.safe_load(config)
# Setup log file
log = open(os.path.join(config['outputs']['logging_dir'], "setup_labels_hosp_comm.log"), "w")
sys.stdout = log
############################################################################
# Define model cohort and training data windows
############################################################################
# Read relevant info from patient details
patient_details = pd.read_csv(
config['inputs']['raw_data_paths']['patient_details'],
usecols=[
"PatientId",
"FirstSubmissionDate",
"MostRecentSubmissionDate",
"DateOfBirth",
"Sex",
"StudyId",
],
delimiter="|",
)
# Select patients for inclusion (those with up to date events in service)
# Original RECEIVER cohort study id list
receiver_patients = ["RC{:02d}".format(i) for i in range(1, 85)]
# This patient needs removing
receiver_patients.remove("RC34")
# Scale up patients (subset)
scaleup_patients = ["SU{:02d}".format(i) for i in range(1, 219)]
# scaleup_patients.append('SU287') #Removed as study ID contains 2 patients
# List of all valid patients for modelling
valid_patients = receiver_patients + scaleup_patients
# Filter for valid patients accounting for white spaces in StudyId (e.g. RC 26 and RC 52)
patient_details = patient_details[
patient_details.StudyId.str.replace(" ", "").isin(valid_patients)
]
# Select only non null entries in patient data start/end dates
patient_details = patient_details[
(patient_details.FirstSubmissionDate.notna())
& (patient_details.MostRecentSubmissionDate.notna())
]
# Create a column stating the earliest permitted date for forward validation
patient_details["EarliestIndexDate"] = config['model_settings']['forward_validation_earliest_date']
# Create a column stating the latest date permitted based on events added to service data
patient_details["LatestPredictionDate"] = config['model_settings']['forward_validation_latest_date']
date_cols = ["FirstSubmissionDate", "MostRecentSubmissionDate", "LatestPredictionDate", "EarliestIndexDate"]
patient_details[date_cols] = patient_details[date_cols].apply(
lambda x: pd.to_datetime(x, utc=True, format="mixed").dt.normalize(), axis=1
)
# Choose the earlier date out of the patient's last submission and the latest COPD data
# events
patient_details["LatestPredictionDate"] = patient_details[
["MostRecentSubmissionDate", "LatestPredictionDate"]
].min(axis=1)
# Calculate the latest date that the index date can be for each patient
patient_details["LatestIndexDate"] = patient_details[
"LatestPredictionDate"
] - pd.DateOffset(days=config['model_settings']['prediction_window'])
# Add 6 months to start of data window to allow enough of a lookback period
patient_details["EarliestDataDate"] = patient_details[
"EarliestIndexDate"
] - pd.DateOffset(days=config['model_settings']['lookback_period'])
# Remove any patients for whom the index start date overlaps the last index
# date, i.e. they have too short a window of data
print("Number of total patients", len(patient_details))
print(
"Number of patients with too short of a window of data:",
len(
patient_details[
patient_details["EarliestIndexDate"] > patient_details["LatestIndexDate"]
]
),
)
patient_details = patient_details[
patient_details["EarliestIndexDate"] < patient_details["LatestIndexDate"]
]
patient_details.to_pickle("./data/patient_details_forward_val.pkl")
# List of remaining patients
model_patients = list(patient_details.PatientId.unique())
model_study_ids = list(patient_details.StudyId.unique())
print(
"Model cohort: {} patients. {} RECEIVER and {} SU".format(
len(model_patients),
len(patient_details[patient_details["StudyId"].str.startswith("RC")]),
len(patient_details[patient_details["StudyId"].str.startswith("SU")]),
)
)
df = patient_details[
[
"PatientId",
"DateOfBirth",
"Sex",
"StudyId",
"EarliestDataDate",
"EarliestIndexDate",
"LatestIndexDate",
"LatestPredictionDate",
]
].copy()
# Create a row per day between the EarliestDataDate and the LatestPredictionDate
df["DateOfEvent"] = df.apply(
lambda x: pd.date_range(x.EarliestDataDate, x.LatestPredictionDate, freq="D"),
axis=1,
)
df = df.explode("DateOfEvent").reset_index(drop=True)
############################################################################
# Extract hospital exacerbations and admissions from COPD service data
############################################################################
# Contains exacerbations among other event types
patient_events = pd.read_csv(
config['inputs']['raw_data_paths']['patient_events'],
delimiter="|",
usecols=["PatientId", "DateOfEvent", "EventType"],
)
# Filter for only patients in model cohort
patient_events = patient_events[patient_events.PatientId.isin(model_patients)]
# Identify hospital exacerbation events
patient_events["IsHospExac"] = model_h.define_service_exac_event(
events=patient_events.EventType, include_community=False
)
# Identify hospital admissions (all causes)
patient_events["IsHospAdmission"] = model_h.define_hospital_admission(
patient_events.EventType
)
admissions = patient_events[patient_events.IsHospAdmission == 1][
["PatientId", "DateOfEvent", "IsHospAdmission"]
]
hosp_exacs = patient_events[patient_events.IsHospExac == 1][
["PatientId", "DateOfEvent", "IsHospExac"]
]
admissions["DateOfEvent"] = pd.to_datetime(
admissions.DateOfEvent, utc=True
).dt.normalize()
hosp_exacs["DateOfEvent"] = pd.to_datetime(
hosp_exacs.DateOfEvent, utc=True
).dt.normalize()
hosp_exacs = hosp_exacs.drop_duplicates()
admissions = admissions.drop_duplicates()
# Save hospital exacerbations and admissions data
hosp_exacs.to_pickle("./data/hospital_exacerbations.pkl")
admissions.to_pickle("./data/hospital_admissions.pkl")
############################################################################
# Extract patient reported exacerbation events
############################################################################
########################
# Data post Q5 change
#######################
# Read file containing patient reported events (not patient_events because it contains
# the dates when patients answered PROs and not which date they reported as having taken
# their rescue meds)
symptom_diary = pd.read_csv(
config['inputs']['raw_data_paths']['pro_symptom_diary'],
usecols=[
"PatientId",
"StudyId",
"Score",
"SubmissionTime",
"SymptomDiaryQ5",
"SymptomDiaryQ11a",
"SymptomDiaryQ11b",
],
delimiter="|",
)
Q5ChangeDate = pd.to_datetime(config['model_settings']['pro_q5_change_date'], utc=True)
symptom_diary = model_h.filter_symptom_diary(
df=symptom_diary, date_cutoff=Q5ChangeDate, patients=model_patients
)
weekly_pros = model_h.get_rescue_med_pro_responses(symptom_diary)
weekly_pros = model_h.set_pro_exac_dates(weekly_pros)
weekly_pros = weekly_pros[
[
"PatientId",
"Q5Answered",
"NegativeQ5",
"IsCommExac",
"DateOfEvent",
"ExacDateUnknown",
]
]
####################################################################################
# Merge hospital and patient reported events with daily patient records
#
# Exacerbations occurring in Lenus service period include verified clinician events
# pre-April 2021 (after onboarding) and community exacerbations recorded in weekly
# PROs post-April 2021. Hospital exacerbations include exacerbations occuring during
# service period.
#####################################################################################
# Patient reported, clinician verified
#df = df.merge(verified_exacs, on=["StudyId", "DateOfEvent"], how="left")
# Patient reported, new rescue med PRO (April 2021 onwards)
df = df.merge(weekly_pros, on=["PatientId", "DateOfEvent"], how="left")
# Hospital exacerbations
df = df.merge(hosp_exacs, on=["PatientId", "DateOfEvent"], how="left")
df = model_h.fill_column_by_patient(df=df, id_col="PatientId", col="StudyId")
# Hospital admissions
df = df.merge(admissions, on=["PatientId", "DateOfEvent"], how="left")
df = model_h.fill_column_by_patient(df=df, id_col="PatientId", col="StudyId")
# Column for whether an exacerbation of any kind occurred on each date. To be filtered
# using (PRO) LOGIC
df["IsExac"] = np.where((df.IsCommExac == 1) | (df.IsHospExac == 1), 1, 0)
# Resample the df to one day per patient starting from the earliest record
df = (
df.set_index("DateOfEvent")
.groupby("StudyId")
.resample("D")
.asfreq()
.drop("StudyId", axis=1)
.reset_index()
)
# Infill binary cols with zero where applicable
df[
[
"Q5Answered",
"NegativeQ5",
"IsHospExac",
"IsCommExac",
"ExacDateUnknown",
"IsExac",
"IsHospAdmission",
]
] = df[
[
"Q5Answered",
"NegativeQ5",
"IsHospExac",
"IsCommExac",
"ExacDateUnknown",
"IsExac",
"IsHospAdmission",
]
].fillna(
0
)
# Infill some columns by StudyId to populate entire df
#df = copd.fill_column_by_patient(df=df, id_col="StudyId", col="FirstSubmissionDate")
df = model_h.fill_column_by_patient(df=df, id_col="StudyId", col="LatestPredictionDate")
df = model_h.fill_column_by_patient(df=df, id_col="StudyId", col="PatientId")
# Retain only dates before the end of each patient's data window
df = df[df.DateOfEvent <= df.LatestPredictionDate]
print("Starting number of exacerbations: {}".format(df.IsExac.sum()))
print(
"Number of exacerbations during COPD service: {}".format(
len(df[(df.IsExac == 1) & (df.DateOfEvent >= df.EarliestDataDate)])
)
)
print(
"Number of unique exacerbation patients: {}".format(
len(df[df.IsExac == 1].PatientId.unique())
)
)
print(
"Exacerbation breakdown: {} hospital, {} patient reported and {} overlapping".format(
df.IsHospExac.sum(),
df.IsCommExac.sum(),
len(df.loc[(df.IsCommExac == 1) & (df.IsHospExac == 1)]),
)
)
print(
"Number of hospital exacerbations during COPD service: {} ({} unique patients)".format(
len(df[(df.IsHospExac == 1) & (df.DateOfEvent >= df.EarliestDataDate)]),
len(
df[
(df.IsHospExac == 1) & (df.DateOfEvent >= df.EarliestDataDate)
].StudyId.unique()
),
)
)
print(
"Community exacerbations from weekly PROs: {} ({} unique patients)".format(
len(df[df.IsCommExac == 1]), len(df[df.IsCommExac == 1].StudyId.unique())
)
)
print(
"Number of patient reported exacerbations with unknown dates: {} ({} overlapping\
with hospital events)".format(
df.ExacDateUnknown.sum(),
len(df[(df.IsHospExac == 1) & (df.ExacDateUnknown == 1)]),
)
)
# Check for any patient reported events with unknown dates that occurred on the same day
# as a hospital event. Hospital events are trusted so set the date to known
df.loc[(df.IsCommExac == 1) & (df.IsHospExac == 1), "ExacDateUnknown"] = 0
print("Remaining exacerbations with unknown dates: {}".format(df.ExacDateUnknown.sum()))
############################################################################
# Implement PRO LOGIC on hospital and patient reported exacerbation events
############################################################################
# Define min and max days for PRO LOGIC. No predictions made or data used within
# logic_min_days after an exacerbation. Events falling between logic_min_days and
# logic_max_days after an event are subject to the weekly rescue med LOGIC criterion
logic_min_days = config['model_settings']['pro_logic_min_days_after_exac']
logic_max_days = config['model_settings']['pro_logic_max_days_after_exac']
# Calculate the days since the previous exacerbation for all patient days.
df = (
df.groupby("StudyId")
.apply(
lambda x: model_h.calculate_days_since_last_event(
df=x, event_col="IsExac", output_col="DaysSinceLastExac"
)
)
.reset_index(drop=True)
)
# Apply exclusion period following all exacerbations
df["RemoveRow"] = model_h.minimum_period_between_exacerbations(
df, minimum_days=logic_min_days
)
# Do not remove hospital exacerbations even if they get flagged up by PRO logic
df["RemoveRow"] = np.where(df["IsHospExac"] == 1, 0, df["RemoveRow"])
print(
"Number of community exacerbations excluded by PRO LOGIC {} day criterion: {}".format(
logic_min_days, len(df[(df.IsExac == 1) & (df.RemoveRow == 1)])
)
)
# Apply criterion for negative weekly Q5 responses - doesn't capture anything post Q5
# change
consecutive_replies = config['model_settings']['neg_consecutive_q5_replies']
df = model_h.apply_logic_response_criterion(
df,
minimum_period=logic_min_days,
maximum_period=logic_max_days,
N=consecutive_replies,
)
# Do not remove hospital exacerbations even if they get flagged up by PRO logic
df["RemoveExac"] = np.where(df["IsHospExac"] == 1, 0, df["RemoveExac"])
print(
"Weekly rescue med (Q5) criterion applied to events occurring between {} and {} \
days after a previous event. {} consecutive negative replies required for the event to \
count as a new event".format(
logic_min_days, logic_max_days, consecutive_replies
)
)
print(
"Number of exacerbations excluded by PRO LOGIC Q5 response criterion: {}".format(
df.RemoveExac.sum()
)
)
print(
"Earliest and latest exacerbations excluded: {}, {}".format(
df[df.RemoveExac == 1].DateOfEvent.min(),
df[df.RemoveExac == 1].DateOfEvent.max(),
)
)
print(
"Remaining number of exacerbations: {}".format(
len(df[(df.IsExac == 1) & (df.RemoveRow != 1) & (df.RemoveExac != 1)])
)
)
print(
"Remaining exacerbations with unknown dates: {}".format(
len(df[(df.ExacDateUnknown == 1) & (df.RemoveRow != 1) & (df.RemoveExac != 1)])
)
)
# Remove data between segments of prolonged events, count only first occurrence
df = model_h.remove_data_between_exacerbations(df)
# Remove 7 days before each reported exacerbation within unknown date (meds in last week)
df = model_h.remove_unknown_date_exacerbations(df)
# Remove rows flagged as to remove
df = df[df["RemoveRow"] != 1]
# New df with unwanted rows removed for events breakdown.
print("---Final exacerbation counts---")
print("Final number of exacerbations: {}".format(df.IsExac.sum()))
exac_patients = pd.Series(df[df.IsExac == 1].StudyId.unique())
print(
"Number of unique exacerbation patients: {} ({} RC and {} SU)".format(
len(exac_patients),
exac_patients.str.startswith("RC").sum(),
exac_patients.str.startswith("SU").sum(),
)
)
print(
"Exacerbation breakdown: {} hospital, {} patient reported and {} overlapping".format(
df.IsHospExac.sum(),
df.IsCommExac.sum(),
len(df.loc[(df.IsCommExac == 1) & (df.IsHospExac == 1)]),
)
)
df.to_pickle("./data/hosp_comm_exacs.pkl")
############################################################################
# Calculate the number of rows to include per patient in the dataset. This
# is calculated based on the average number of exacerbations per patient and
# is then adjusted to the average time within the service
############################################################################
# Calculate the average time patients have data recorded in the COPD service
service_time = df[["StudyId", "LatestPredictionDate", "EarliestDataDate"]]
service_time = service_time.drop_duplicates(subset="StudyId", keep="first")
service_time["ServiceTime"] = (
service_time["LatestPredictionDate"] - service_time["EarliestDataDate"]
).dt.days
avg_service_time = sum(service_time["ServiceTime"]) / len(service_time["ServiceTime"])
avg_service_time_months = round(avg_service_time / 30)
print("Average time in service (days):", avg_service_time)
print("Average time in service (months):", avg_service_time_months)
# Calculate the average number of exacerberations per patient
avg_exac_per_patient = round(
len(df[df["IsExac"] == 1]) / df[df["IsExac"] == 1][["StudyId"]].nunique().item(), 2
)
print(
"Number of exac/patient/months: {} exacerbations/patient in {} months".format(
avg_exac_per_patient, avg_service_time_months
)
)
print(
"On average, 1 exacerbation occurs in a patient every: {} months".format(
round(avg_service_time_months / avg_exac_per_patient, 2)
)
)
#################################################################
# Calculate index dates. 1 row/patient for every 5 months in service.
#################################################################
# Obtain the number of rows required per patient.
service_time["NumRows"] = round(service_time["ServiceTime"] / config['model_settings']['one_row_per_days_in_service']).astype("int")
patient_details = pd.merge(
patient_details, service_time[["StudyId", "NumRows"]], on="StudyId", how="left"
)
# Calculate the number of days between earliest and latest index
patient_details["NumDaysPossibleIndex"] = (
patient_details["LatestIndexDate"] - patient_details["EarliestIndexDate"]
).dt.days
patient_details.to_csv("./data/pat_details_to_calc_index_dt.csv", index=False)
# Make sure the number of rows isn't larger than the number of possible index dates
patient_details["NumRows"] = np.where(patient_details["NumRows"] > patient_details["NumDaysPossibleIndex"], patient_details["NumDaysPossibleIndex"], patient_details["NumRows"])
# Generate random index dates
# Multiple seeds tested to identify the random index dates that give a good
# distribution across months. Seed chosen as 2188398760 from check_index_date_dist.py
random_seed_general = config['model_settings']['index_date_generation_master_seed']
random.seed(random_seed_general)
# Create different random seeds for each patient
patient_details["RandomSeed"] = random.sample(
range(0, 2**32), patient_details.shape[0]
)
# Create random index dates for each patient based on their random seed
rand_days_dict = {}
rand_date_dict = {}
for index, row in patient_details.iterrows():
np.random.seed(row["RandomSeed"])
rand_days_dict[row["StudyId"]] = np.random.choice(
row["NumDaysPossibleIndex"], size=row["NumRows"], replace=False
)
rand_date_dict[row["StudyId"]] = [
row["EarliestIndexDate"] + timedelta(days=int(day))
for day in rand_days_dict[row["StudyId"]]
]
# Create df from dictionaries containing random index dates
index_date_df = pd.DataFrame.from_dict(rand_date_dict, orient="index").reset_index()
index_date_df = index_date_df.rename(columns={"index": "StudyId"})
# Convert the multiple columns containing index dates to one column
index_date_df = (
pd.melt(index_date_df, id_vars=["StudyId"], value_name="IndexDate")
.drop(["variable"], axis=1)
.sort_values(by=["StudyId", "IndexDate"])
)
index_date_df = index_date_df.dropna()
index_date_df = index_date_df.reset_index(drop=True)
# Join index dates with exacerbation events
exac_events = pd.merge(index_date_df, df, on="StudyId", how="left")
exac_events["IndexDate"] = pd.to_datetime(exac_events["IndexDate"], utc=True)
# Calculate whether an exacerbation event occurred within the model time window (3 months)
# after the index date
exac_events["TimeToEvent"] = (
exac_events["DateOfEvent"] - exac_events["IndexDate"]
).dt.days
exac_events["ExacWithin3Months"] = np.where(
(exac_events["TimeToEvent"].between(1, config['model_settings']['prediction_window'], inclusive="both"))
& (exac_events["IsExac"] == 1),
1,
0,
)
exac_events["HospExacWithin3Months"] = np.where(
(exac_events["TimeToEvent"].between(1, config['model_settings']['prediction_window'], inclusive="both"))
& (exac_events["IsHospExac"] == 1),
1,
0,
)
exac_events["CommExacWithin3Months"] = np.where(
(exac_events["TimeToEvent"].between(1, config['model_settings']['prediction_window'], inclusive="both"))
& (exac_events["IsCommExac"] == 1),
1,
0,
)
exac_events = exac_events.sort_values(
by=["StudyId", "IndexDate", "ExacWithin3Months"], ascending=[True, True, False]
)
exac_events = exac_events.drop_duplicates(subset=["StudyId", "IndexDate"], keep="first")
exac_events = exac_events[
[
"StudyId",
"PatientId",
"IndexDate",
"DateOfBirth",
"Sex",
"ExacWithin3Months",
"HospExacWithin3Months",
"CommExacWithin3Months",
]
]
# Save exac_events
exac_events.to_pickle("./data/patient_labels_forward_val_hosp_comm.pkl")
# Summary info
class_distribution = (
exac_events.groupby("ExacWithin3Months").count()[["StudyId"]].reset_index()
)
class_distribution.plot.bar(x="ExacWithin3Months", y="StudyId")
plt.savefig(
"./plots/class_distributions/final_seed_"
+ str(random_seed_general)
+ "_class_distribution_hosp_comm.png",
bbox_inches="tight",
)
print("---Summary info after setting up labels---")
print("Number of unique patients:", exac_events["StudyId"].nunique())
print("Number of rows:", len(exac_events))
print(
"Number of exacerbations within 3 months of index date:",
len(exac_events[exac_events["ExacWithin3Months"] == 1]),
)
print(
"Percentage positive class (num exac/total rows): {} %".format(
round(
(len(exac_events[exac_events["ExacWithin3Months"] == 1]) / len(exac_events))
* 100,
2,
)
)
)
print(
"Percentage negative class: {} %".format(
round(
(len(exac_events[exac_events["ExacWithin3Months"] == 0]) / len(exac_events))
* 100,
2,
)
)
)
print(
"Percentage hospital exacs: {} %".format(
round(
(len(exac_events[exac_events["HospExacWithin3Months"] == 1]) / len(exac_events))
* 100,
2,
)
)
)
print(
"Percentage community exacs: {} %".format(
round(
(len(exac_events[exac_events["CommExacWithin3Months"] == 1]) / len(exac_events))
* 100,
2,
)
)
)
print("Class balance:")
print(class_distribution)