"""Module containing code for model C (exacerbation prediction).""" import numpy as np import pandas as pd from lenusml import encoding def apply_logic_response_criterion(df, N=2, minimum_period=14, maximum_period=35): """ Apply PRO LOGIC criterion 2 (consecutive negative Q5 replies required between events). For events that occur after the minimum required period following a previous exac, e.g. longer than 14 days, but before they are automatically considered as a new exac event, e.g. 35 days, PRO LOGIC considers weekly PRO responses between the two events. For subsequent events to count as separate events, there must be at least N consecutive negative responses (no rescue meds taken) to weekly PROs between each postive reply. Note PRO LOGIC is applied to both hospital and patient reported events. Args: df (pd.DataFrame): must contain columns for PatientId, DateOfEvent, Q5Answered, NegativeQ5, IsExac and DaysSinceLastExac. minimum_period (int): minimum number of days since the previous exac (any exacs within this window will already be removed with PRO LOGIC criterion 1). Default value is 14 days. maximum_period (int): maximum number of days since the previous exac (any exacs occurring after this period will automatically count as a separate event). Default is 35 days. Returns: pd.DataFrame: input df with a new boolean column 'RemoveExac'. """ # Retrieve dataframe indices of exacs falling under PRO LOGIC criterion 2 (Q5 replies) indices = get_logic_exacerbation_indices(df, minimum_period=minimum_period, maximum_period=maximum_period) remove_exac = [] # Loop over each exac and evaluate PRO LOGIC criterion, returning 1 (remove) or 0 for exac_index in indices: remove_exac.append(logic_consecutive_negative_responses(df, exac_index, N)) # Create dataframe containing exac indices and a boolean column stating whether to # remove that exac due to failing Q5 response criterion and merge with original df remove_exac = pd.DataFrame({'ind': indices, 'RemoveExac': remove_exac}) df = df.merge(remove_exac.set_index('ind'), left_index=True, right_index=True, how='left') return df def bin_numeric_column(*, col, bins, labels): """ Use pd.cut to bin numeric data into categories. Args: col (pd.Series): dataframe column to be binned. bins (list): numeric values of bins. labels (list): corresponding labels for the bins. Returns: pd.Series: binned column. """ return pd.cut(col, bins=bins, labels=labels, right=False).astype('str') def calculate_days_since_last_event(*, df, event_col, output_col): """ Calculate the days since the last event, e.g. exacerbation or rescue med prescription. Restarts the count from one the day following an event. Any days without a previous event have the output column set to -1 Args: df (pd.DataFrame): dataframe with a column containing dates and a boolean column stating whether an event occurred on that date. event_col (str): name of the boolean column for whether an event occurred. Returns: df: the input dateframe with an additional column stating the number of days since the previous event occurred (or -1 if no previous event). """ # Get all events all_events = df[df[event_col].eq(1)].copy() all_events['PrevEvent'] = all_events.index # Merge the full df with the event df on their indices to the closest date in the past # i.e. the most recent exacerbation df = pd.merge_asof(df, all_events['PrevEvent'], left_index=True, right_index=True, direction='backward') # Calculate the days since the previous event, restarting the count from 1 the # day following an exacerbation (using shift) df[output_col] = df.index - df['PrevEvent'].shift(1) # Set to -1 for any rows without a prior exacerbation df[output_col] = df[output_col].fillna(-1).astype('int64') df = df.drop(columns=['PrevEvent']) return df def calculate_diff_from_rolling_mean(*, df, cols): for col in cols: df[col + '_diff'] = df[col] - df[col + '_ave'] return df def extract_clinician_verified_exacerbations(df): """ Extract verified events from clinician verification spreadsheets. Extract only clinician verified events from verification spreadsheets and set the date to the clinician supplied date if entered. Include a flag column for if the date was changed from the PRO question response date. Args: df (pd.DataFrame): event verification data supplied by clinicians. Returns: pd.DataFrame: contains StudyId, DateOfEvent (a mix of true event dates and PRO response dates if true dates unknown), IsCommExac (set to 1 here, used after merging later) and ExacDateUnknown (boolean, 1 if clinicians did not change the date). """ # Filter for only verified events df = df[df['Exacerbation confirmed'] == 1].copy() df['DateRecorded'] = pd.to_datetime(df.DateRecorded, utc=True).dt.normalize() df['New Date'] = pd.to_datetime(df['New Date'], utc=True).dt.normalize() # Change the event date to the clinician supplied date if entered. This is considered # the true event date. Set the event date to the PRO response date otherwise and flag # that the true date is unknown df['DateOfEvent'] = np.where(df['Date changed'] == 1, df['New Date'], df['DateRecorded']) df['ExacDateUnknown'] = np.int64(np.where(df['Date changed'] == 1, 0, 1)) # Flag all events as community events (this df will merge with hospital events later) df['IsCommExac'] = 1 df = df[['StudyId', 'DateOfEvent', 'IsCommExac', 'ExacDateUnknown']] return df def define_hospital_admission(events): """ Define whether a COPD service event was an admission and return 1 (yes) or 0 (no). Args: events (pd.DataFrame): events from COPD service previously merged with PatientEventTypes.txt to get a column containing EventTypeId event_name_col (str): name of column containing COPD service EventTypeId Returns: array: boolean stating whether an event was a hospital admission. """ hospital_event_names = ['Hospital admission - emergency, COPD related', 'Hospital admission - emergency, COPD unrelated'] return np.where(events.isin(hospital_event_names), 1, 0) def define_service_exac_event(*, events, event_name_col='EventName', include_community=False): """State if a COPD service event was an exacerbation and return 1 (yes) or 0 (no). Args: events (pd.DataFrame): events from COPD service previously merged with PatientEventTypes.txt to get a column containing EventTypeId event_name_col (str): name of column containing COPD service EventTypeId include_community (bool): whether to include event types corresponding to patient reported exacerbations (e.g. community managed with rescue meds). Defaults to False. Returns: array: boolean stating whether an event was an exacerbation. """ if include_community is True: exacerbation_event_names = ['Hospital admission - emergency, COPD related', 'Exacerbation - self-managed with rescue pack', 'GP review - emergency, COPD related', 'Emergency department attendance, COPD related', 'Exacerbation - started abs/steroid by clinical team'] else: exacerbation_event_names = ['Hospital admission - emergency, COPD related', 'GP review - emergency, COPD related', 'Emergency department attendance, COPD related', 'Exacerbation - started abs/steroid by clinical team'] return np.where(events.isin(exacerbation_event_names), 1, 0) def fill_column_by_patient(*, df, id_col, col): """ Forward and back fill data by patient to fill gaps, e.g. from merges. Args: df (pd.DataFrame): patient data. Must contain col and id_col columns. id_col (str): name of column containing unique patient identifiers. col (str): name of column to be filled. Returns: pd.DataFrame: input data with col infilled. """ df[col] = df.groupby(id_col)[col].apply(lambda x: x.ffill().bfill()) return df def filter_symptom_diary(*, df, patients, date_cutoff=None): """ Filter COPD symptom diary data for patients and dates of interest. Args: df (pd.DataFrame): symptom diary data. Must contain 'SubmissionTime' and 'PatientId' columns. patients (list): patient IDs of interest. Returns: pd.DataFrame: filtered symptom diary. """ df['SubmissionTime'] = pd.to_datetime(df.SubmissionTime, utc=True).dt.normalize() # Take only data from after the cutoff if provided (e.g. weekly Q5 change) if date_cutoff: df = df[df.SubmissionTime >= date_cutoff] # Filter for patients of interest df = df[df.PatientId.isin(patients)] return df def get_logic_exacerbation_indices(df, minimum_period=14, maximum_period=35): """ Return dataframe indices of exacs that need checking for PRO reponses since last exac. Get the indices of exacerbations that occur long enough after the previous event to not be removed by PRO LOGIC criterion 1 (e.g. within 14 days of previous exac) but not long enough after to be counted as a separate event without further analysis. Called by apply_logic_response_criterion. Args: df (pd.DataFrame): must contain IsExac and DaysSinceLastExac columns. minimum_period (int): minimum number of days since the previous exac (any exacs within this window will already be removed with PRO LOGIC criterion 1). Default value is 14 days. maximum_period (int): maximum number of days since the previous exac (any exacs occurring after this period will automatically count as a separate event). Default is 35 days. Returns: list: dataframe indices of relevant events. """ # Get the dataframe indices for all exacerbations occurring within period of interest indices = df[(df.IsExac.eq(1)) & (df.DaysSinceLastExac > minimum_period) & (df.DaysSinceLastExac <= maximum_period)].index.to_list() return indices def get_rescue_med_pro_responses(df): """Extract all responses to weekly PRO Q5 (rescue meds). Add new boolean columns stating if Q5 was answered, whether it was a negative response (no rescue meds taken in previous week) and whether the reply means a community exacerbation. The latter two columns will be opposites. Args: df (pd.DataFrame): PRO symptom diary responses. Returns: pd.DataFrame: filtered weekly PROs with additional boolean columns Q5Answered, NegativeQ5 and IsCommExac. """ # Extract responses to weekly PRO Q5 (rescue meds) df = df[df.SymptomDiaryQ5.notna()].copy() df['SymptomDiaryQ5'] = df['SymptomDiaryQ5'].astype('int64') # Columns for whether Q5 was answered and if the response was negative (no exac) df['Q5Answered'] = 1 df['NegativeQ5'] = np.int64(np.where(df.SymptomDiaryQ5 == 0, 1, 0)) # Define community exacerbation as a positive reply to Q5 df['IsCommExac'] = np.int64(np.where(df.SymptomDiaryQ5 == 1, 1, 0)) return df def logic_consecutive_negative_responses(df, i, N=2): """ Calculate number of consecutive -ve Q5 replies since previous exac (PRO LOGIC). Given the dataframe index of the current exac identified as falling under the Q5 criterion, calculate the number of negative replies to the weeky rescue med question and check if there are enough for the event to count as distinct from the previous. Called by apply_logic_response_criterion. Args: df (pd.DataFrame): must contain weekly PRO replies and output from get_rescue_med_pro_responses, set_pro_exac_dates and calculate_days_since_exacerbation. i (int): index of exac of interest. N (int): number fo consecutive negative rescue meds required for event to be counted as a separate event and retained in data. Default is 2. Returns: int: flag for whether the exac failed the criterion. Returns 1 for failed (exac to be removed) and 0 for passed (exac to be retained). """ # Select data since the previous exacerbation days = int(df.iloc[i].DaysSinceLastExac) data = df.iloc[i - days + 1: i] # Select replies to Q5 data = data[data.Q5Answered.eq(1)][['PatientId', 'DateOfEvent', 'Q5Answered', 'NegativeQ5']] # Check if there are sufficient responses if len(data) < N: return 1 else: # Resample to 7 days (weekly) to account for missing responses. Resampling using # the 'W' option can give spurious nans - use '7D' instead data = data.set_index('DateOfEvent').resample('7D', origin='start').sum().reset_index() # Calculate number of consecutive negative replies to Q5 (no rescue meds taken) consecutive_negative_responses = data[data.NegativeQ5.eq(1)][ 'NegativeQ5'].groupby(data.NegativeQ5.eq(0).cumsum()).sum().reset_index( drop=True).max() return 1 if consecutive_negative_responses < N else 0 def minimum_period_between_exacerbations(df, minimum_days=14): """ Identify exacs occurring too soon after the previous exac based on DaysSinceLastExac. Returns 1 if the exacerbation occurred within minimum_days of that patient's previous exacerbation and 0 if not. Args: df (pd.DataFrame): must contain DaysSinceLastExac column. Returns: array: contains 1 or 0. """ return np.where((df['DaysSinceLastExac'] > 0) & (df['DaysSinceLastExac'] <= minimum_days), 1, 0) def remove_data_between_exacerbations(df): """ Remove data between first exac and subsequent exacs that failed PRO LOGIC criterion 2. Ensures only the first in a series of related events are counted. Any subsequent exacs that occurred too close to the initial event without sufficient negative weekly PRO responses in the interim will be flagged for removal. This function removes flags for removal all data from the day after the first event up to the date of events to be removed. Data following the final event in the series will be removed by minimum_period_between_exacerbations. Args: df (pd.DataFrame): must contain RemoveExac and DaysSinceLastExac columns. Returns: pd.DataFrame: days between first event and subsequent event(s) that failed the Q5 criterion are now flagged for removal in RemoveRow. """ indices = df[df.RemoveExac.eq(1)].index.to_list() # Check there are exacerbations that failed the logic criterion for N consecutive # negative reponses to Q5 of weekly PROs (rescue meds) if len(indices) > 0: for exac_index in indices: # Select data since the previous exacerbation days = int(df.iloc[exac_index].DaysSinceLastExac) # Set data since last exac up to and including current exac to be removed df.loc[exac_index - days + 1: exac_index, 'RemoveRow'] = 1 return df def remove_unknown_date_exacerbations(df, days_to_remove=7): """ Remove data prior to and including an exacerbation whose date is unknown. Args: df (pd.DataFrame): one row per day per patient for full data window. Must include ExacDateUnknown column. days_to_remove (int): number of days of data to remove leading up to (and including) the PRO response date. Default is 7 days. Returns: pd.DataFrame: input dataframe with updated RemoveRow column. """ # Get indices of all exacs whose dates are flagged as unknown. indices = df[df.ExacDateUnknown.eq(1)].index.to_list() # Check there are exacerbations with unknown dates (answer=1 in SymptomDiaryQ11a) if len(indices) > 0: for exac_index in indices: # Set specified number of previous days data up to and including current exac # to be removed df.loc[exac_index - days_to_remove + 1: exac_index, 'RemoveRow'] = 1 return df def rolling_mean_previous_period(*, df, cols, date_col, id_col, window): """ Resample data for each patient to daily and calculate rolling mean over window. Uses daily resampling due to strange behaviour with weekly/yearly resampling and calculating rolling quantities with missing/NaN entries in the series. Calculates the rolling mean over the window (e.g. 365 days) and shifts the data so that each date contains the rolling mean for the previous period, e.g. a rolling 365 day mean includes data for the previous 365 days and not the current date. This will exclude the current exacerbation or hospital admission from the counts. Args: df (pd.DataFrame): data of interest. Must contain specified col, date_col and id_col columns. cols (str): name of columns on which to calculate rolling mean. date_col (str): name of columns containing dates (will be set as index for aggregation) id_col (str): name of column containing unique patient identifiers. window (int): length of rolling window in days. Use window = 7 for weekly mean and window = 365 for yearly mean. Returns: pd.DataFrame: input dataframe with columns for rolling means of cols added with an '_ave_' suffix in the column names. """ # Copy the original df to retain all columns and dates df_copy = df.copy() # Resample the time series to daily records per patient rolling_mean = df_copy.set_index(date_col).groupby(id_col)[cols].resample( 'D').mean().reset_index() # Calculate the rolling mean over the specified window (in days) rolling_mean = rolling_mean.set_index(date_col).groupby(id_col)[cols].rolling( window=window, min_periods=1).mean().reset_index() # Shift the series to exclude the current day rolling_mean[cols] = rolling_mean.groupby(id_col)[cols].shift(1) # # Add a suffix to the column name to denote it is an aggregation rolling_mean = rolling_mean.rename( columns={col: col + '_ave' for col in rolling_mean.columns if col in cols}) return rolling_mean def rolling_sum_previous_period(*, df, col, date_col, id_col, window, output_col): """ Resample data for each patient to daily and calculate rolling sum over window. Uses daily resampling due to strange behaviour with weekly/yearly resampling and calculating rolling quantities with missing/NaN entries in the series. Calculates the rolling sum over the window (e.g. 365 days) and shifts the data so that each date contains the rolling sum for the previous period, e.g. a rolling 365 day sum includes data for the previous 365 days and not the current date. This will exclude the current exacerbation or hospital admission from the counts. Args: df (pd.DataFrame): data of interest. Must contain specified col, date_col and id_col columns. col (str): name of column on which to calculate rolling sum. date_col (str): name of columns containing dates (will be set as index for aggregation) id_col (str): name of column containing unique patient identifiers. window (int): length of rolling window in days. Use window = 7 for weekly sums and window = 365 for yearly sums. output_col (str): name of rolling sum column in output dataframe. Returns: pd.DataFrame: input dataframe with column for rolling sum of col added. """ # Copy the original df to retain all columns and dates df_copy = df.copy() # Resample the time series to daily records per patient rolling_sum = df_copy.set_index(date_col).groupby(id_col)[col].resample( 'D').sum().reset_index() # Calculate the rolling sum over the specified window (in days) rolling_sum = rolling_sum.set_index(date_col).groupby(id_col)[col].rolling( window=window, min_periods=1).sum().reset_index() # Shift the series to exclude the current day rolling_sum[col] = rolling_sum.groupby(id_col)[col].shift(1).fillna(0) # Rename the aggregate column as specified rolling_sum = rolling_sum.rename(columns={col: output_col}) # Merge back onto the original df df = df.merge(rolling_sum, on=[id_col, date_col], how='left') df[output_col] = df[output_col].astype('int64') return df def set_prediction_window(*, df, prediction_window): """ Set the prediction window to N days by setting IsExac=1 for N-1 days prior to events. Set the prediction window to prediction_window (N) days, e.g. for N = 3, change the IsExac label to 1 for the two days prior to the stated exac date to give three days of exacerbation. The labels now represent whether an exac occurs within N days of the prediction date rather than the exact date only. Args: df (pd.DataFrame): must contain IsExac column. Should contain the final list of exacerbation events to be used for modelling. prediction_window (int): length of model prediction window in days. Returns: pd.DataFrame: input data frame with extended exacerbation window. """ # Get indices of all exacerbations indices = df[df.IsExac.eq(1)].index.to_list() # Check there are exacerbations in the data and process if so if len(indices) > 0: for exac_index in indices: # Set specified number of previous days data up to and including current exac # to be exacerbations df.loc[exac_index - prediction_window + 1: exac_index, 'IsExac'] = 1 return df def set_pro_exac_dates(df): """ Set date of community exacerbations reported in weekly PROs Q5 and flag unknown dates. Args: df (pd.DataFrame: processed weekly PROs Q5 respnses, e.g. output of get_rescue_med_pro_responses Returns: pd.DataFrame: input dataframe with additional columns for DateOfEvent (datetime) and ExacDateUnknown (0 or 1). """ # Take known exacerbation (rescue med) dates from SymptomDiaryQ11b, otherwise set the # date to the date of PRO response df['DateOfEvent'] = np.where(df.SymptomDiaryQ11a == 2, df.SymptomDiaryQ11b, df.SubmissionTime) # Flag which dates were unknown from the PRO response df['ExacDateUnknown'] = np.int64(np.where((df.IsCommExac == 1) & (df.SymptomDiaryQ11a != 2), 1, 0)) df['DateOfEvent'] = pd.to_datetime(df.DateOfEvent, utc=True).dt.normalize() df = df.drop_duplicates(keep='last', subset=['PatientId', 'DateOfEvent']) return df def triple_inhaler_therapy_service(*, df, id_col, inhaler_col, include_mitt=False): """ Create boolean (1/0) feature for whether a patient is taking triple inhaler therapy. Option to include Single Inhaler Triple Therapy (SITT, default) only or also include Multiple Inhaler Triple Therapy (MITT). SITT therapies are 'LAMA +LABA-ICS' and 'LABA-LAMA-ICS'. MITT therapy is 'LAMA' + 'LABA-ICS'. Args: df (pd.DataFrame): dataframe containing list of inhaler names against patient IDs. id_col (str): name of patient ID column. inhaler_col (str): name of column containing inhaler types in the format of the COPD service data, e.g. LAMA, LABA, LABA-LAMA-ICS, LAMA +LABA-ICS etc. include_mitt (bool): whether to include Multiple Inhaler Triple Therapy (MITT). Returns: pd.DataFrame: input df with added boolean (1/0) feature for whether the patient is taking triple inhaler therapy as defined by SITT (default) and MITT. """ # Drop any duplicate entries df = df.drop_duplicates() # Pivot the table to one row per ID df = df.pivot(index=id_col, columns=inhaler_col, values=inhaler_col).reset_index().rename_axis(None, axis=1) # Create columns for any service inhaler types not present in the data types = ['LABA-LAMA', 'LAMA', 'LABA-ICS', 'LAMA +LABA-ICS', 'LABA-LAMA-ICS', 'LABA'] for inhaler in types: if inhaler not in df: df[inhaler] = np.nan # Create column for triple inhaler therapies (SITT) df['TripleTherapy'] = np.int64(np.where( ~df['LABA-LAMA-ICS'].isna() | ~df['LAMA +LABA-ICS'].isna(), 1, 0)) # Modify SITT column to also include MITT if needed if include_mitt is True: df['TripleTherapy'] = np.int64(np.where( ~df['LAMA'].isna() & ~df['LABA-ICS'].isna(), 1, df['TripleTherapy'])) df = df[[id_col, 'TripleTherapy']] return df def unit_lookup(units): """Convert Lenus platform unit codes to human readable units. Args: units (pd.Series): Lenus platform unit codes for a measurement Returns: array: human readable measurement units. """ units_lookup = {0: 'Count', 1: 'CountPerSecond', 2: 'InternationalUnit', 3: 'Joule', 4: 'Kelvin', 5: 'Kilogram', 6: 'KilogramPerLiter', 7: 'KilogramPerSquareMeter', 8: 'Liter', 9: 'LiterPerKilogramSecond', 10: 'LiterPerSecond', 11: 'Meter', 12: 'Pascal', 13: 'Percent', 14: 'Second', 15: 'Siemen', 16: 'Undefined'} # Replace the unit code with its description if in the look up table, otherwise # return 'Undefined' units = np.where(units.isin(list(units_lookup.keys())), units.replace(units_lookup), 'Undefined') return units def kfold_encode_train_data(*, df, id_col, fold_patients, cols_to_encode, target): """ K-fold target encoding of train data. Fold by fold target encoding of train data is used to prevent data leakage in cross- validation (the same folds are used for encoding and CV). For example, in 10-fold target encoding, each fold is encoded using the other nine folds and that fold is then used as the validation fold in CV. The complete train data set is used to target encode the holdout test data set. Parameters ---------- val_fold : dataframe validation data to be target encoded. This could be one of the K folds used in cross-validation or the holdout test set. cols_to_encode : list of strings names of columns to be encoded. target : str name of the target variable column. Returns ------- val_fold : dataframe target encoded validation fold (or holdout test set). encodings_all : dict encodings used for each column. """ # Encode the train data fold by fold appended_data = [] # Loop over folds and target encode for i, fold in enumerate(fold_patients): print("Fold ", i) val_fold_data = df[df[id_col].isin(fold)] train_fold_data = df[~df[id_col].isin(fold)] encoded_fold_data, encodings = encoding.encode_validation_fold( val_fold=val_fold_data, train_folds=train_fold_data, cols_to_encode=cols_to_encode, target=target) appended_data.append(encoded_fold_data) # Reconstruct full dataframe df_encoded = pd.concat(appended_data) df_encoded.reset_index(inplace=True, drop=True) return df_encoded