Can-SAVE / CanSave.py
pfilonenko's picture
Upload 6 files
12ac942 verified
# required libraries
import re
import yaml
import pickle
import numpy as np
import pandas as pd
from lifelines import LogLogisticAFTFitter
from KaplanMeierEstimator import KaplanMeierEstimator
import warnings
warnings.filterwarnings('ignore')
class CanSave:
'''Object of the Can-Save method for feature engineering.'''
def __load_object_by_pickle(self, path):
'''Method to load a deserializated file.'''
loaded_obj = pickle.load(open(path, 'rb'))
return loaded_obj
def __load_config(self, config_path):
'''Method to load config-file.'''
with open(config_path, 'r') as file:
self.config = yaml.safe_load(file)
def __load_ICD10_groups(self):
'''Method to load the table of ICD-10 groups.'''
path_icd10_groups = self.config['path_icd10_groups']
groups = pd.read_excel(path_icd10_groups).dropna(subset=['left_code', 'right_code'])
groups = groups[groups['selected'] == 1]
self.groups = groups
def __load_survival_models(self):
'''Method to load YOUR trained survival models.'''
'''
DISCLAIMER!
We do not make trained models publicly available.
Therefore, we only provide an example of how survival models can be trained.
Example is located in the "Example_How_To_Train_Survival_Models.py".
'''
# Load trained survival models
kaplan_meier_males = self.__load_object_by_pickle(self.config['path_kaplan_meier_males'])
kaplan_meier_females = self.__load_object_by_pickle(self.config['path_kaplan_meier_females'])
kaplan_meier_both = self.__load_object_by_pickle(self.config['path_kaplan_meier_both'])
aft = self.__load_object_by_pickle(self.config['path_aft'])
# Form dict of survival models
self.survival_models = {
'kaplan_meier_males': kaplan_meier_males,
'kaplan_meier_females': kaplan_meier_females,
'kaplan_meier_both': kaplan_meier_both,
'aft': aft,
}
def __init__(self, CONFIG_PATH):
'''Constructor.'''
# Load config-file
self.__load_config(CONFIG_PATH)
# Load table of ICD-10 groups
self.__load_ICD10_groups()
# Load trained survival models
self.__load_survival_models()
def __check_russian_service_code(self, code):
'''Method to check the medical service code used in the Russian Federation (e.g.: A04.16, B01.042).'''
code = str(code).upper()
mas = code.split('.')
first = mas[0]
second = mas[1] if len(mas) > 1 else '00'
if 'A' in first:
length = 2
elif 'B' in first:
length = 3
else:
raise ValueError(f'Invalid medical code: {code}')
if len(first) < 3:
first = first[0] + '0' + first[1]
if len(second) == length:
pass
elif len(second) > length:
second = second[-length:]
else:
while len(second) != length:
second = '0' + second
code = '{}.{}'.format(first, second)
return code
def __common_features(self, features, ehr, date_start, date_pred):
'''Method to make common features.'''
# MONTH OF THE PREDICTION
month_pred = date_pred.month
features['month_pred'] = month_pred
# DAY OF WEEK
features['day_of_week'] = date_pred.isoweekday()
# VISIT_NUM
visit_num = len(ehr)
features['visit_num'] = visit_num
# PROPORTION OF DIAGNOSIS AND SERVICES
features['diagnosis_prop'] = ehr['is_diagnose'].mean()
features['services_prop'] = 1.0 - ehr['is_diagnose'].mean()
# WEEKS AFTER FIRST VISIT
features['weeks_after_first_visit'] = (date_pred - ehr['date'].min()).days / 7.
# WEEKS AFTER LAST VISIT
features['weeks_after_last_visit'] = (date_pred - ehr['date'].max()).days / 7.
# AVG. WEEKS BETWEEN VISITS
features['weeks_betw_visits_avg'] = (ehr['date'].max() - ehr['date'].min()).days / (visit_num + 1.0e-14)
# VISITS IN LAST N MONTHS
for month in [1, 3, 6, 9, 12, 15, 18, 21, 24]:
end_dt = date_pred
start_dt = date_pred - pd.DateOffset(months=month)
mask = (start_dt <= ehr['date']) & (ehr['date'] <= end_dt)
new_col = f'visits_in_last_{month}_months'
features[new_col] = mask.sum()
# DIAGNOSE GROUPS
mask = ehr['is_diagnose'] == 1
df = ehr[mask]
df['code_short'] = df['code'].apply(lambda code: str(code).split('.')[0])
for left_mkb, right_mkb in zip(self.groups['left_code'], self.groups['right_code']):
mask = (left_mkb <= df['code_short']) & (df['code_short'] <= right_mkb)
new_col = 'diagnose_group_{}_{}'.format(left_mkb, right_mkb)
features[new_col] = mask.sum()
# UNIQUE DIAGNOSIS
features['unique_diagnosis'] = len(set(df['code_short']))
# UNIQUE CLASSES OF DIAGNOSIS
df['code_class'] = df['code_short'].apply(lambda code: re.match('[A-Z]', str(code)).group(0))
features['unique_classes_of_diagnosis'] = len(set(df['code_class']))
# BINARY (1 - Diagnosis group has been in the EHR, 0 - has not been in the EHR)
for left_mkb, right_mkb in zip(self.groups['left_code'], self.groups['right_code']):
feature = f'diagnose_group_{left_mkb}_{right_mkb}'
val = features[feature]
new_col = f'has_{left_mkb}_{right_mkb}'
features[new_col] = 1 if val > 0 else 0
# WEEKS FROM LAST AND FIRST OCCURRENCE OF DIAGNOSIS GROUP
for left_mkb, right_mkb in zip(self.groups['left_code'], self.groups['right_code']):
mask = (left_mkb <= df['code']) & (df['code'] <= right_mkb)
df_group = df[mask]
new_col = f'weeks_from_first_{left_mkb}_{right_mkb}'
if len(df_group) > 0:
features[new_col] = (date_pred - df_group['date'].min()).days / 7.
else:
features[new_col] = (date_pred - date_start).days / 7.
new_col = f'weeks_from_last_{left_mkb}_{right_mkb}'
if len(df_group) > 0:
features[new_col] = (date_pred - df_group['date'].max()).days / 7.
else:
features[new_col] = (date_pred - date_start).days / 7.
return features
def __regional_features(self, features, ehr, date_start, date_pred):
'''
Method to make features adopted to some regional specific characteristics.
This method should be adopted for your country.
'''
# AVG. HUMIDITY AT MONTH FOR THE PREDICTION IN RUSSIA
avg_humidity = {
1: 88., 2: 84., 3: 74., 4: 65., 5: 64., 6: 68.,
7: 72., 8: 69., 9: 77., 10: 81., 11: 88., 12: 91.
}
month_pred = features['month_pred']
features['avg_humidity'] = avg_humidity[month_pred]
# AVG. TEMPERATURE AT MONTH FOR THE PREDICTION IN RUSSIA
avg_temperature = {
1: -7.2, 2: -5.2, 3: -0.5, 4: 8.1, 5: 15.5, 6: 18.0,
7: 20.3, 8: 18.9, 9: 13.3, 10: 5.7, 11: 1.3, 12: -3.4
}
features['avg_temperature'] = avg_temperature[month_pred]
# SERVICE GROUPS (in according to ORDER 804n of the Ministry of Health of RUSSIA)
mask = ehr['is_diagnose'] == 0
df = ehr[mask]
df['code_mod'] = df['code'].apply(self.__check_russian_service_code)
# type of medical service
codes = [f'A0{num}.' if num < 10 else f'A{num}.' for num in range(30+1)] + ['B01.','B02.','B03.','B04.','B05.']
for val in codes:
mask = df['code_mod'].str.contains(val)
new_col = 'service_group_{}XX'.format(val)
features[new_col] = mask.sum()
# anatomical and functional area of medical service & list of medical specialties
group1 = [f'.0{num}' if num < 10 else f'.{num}' for num in range(1, 30+1)]
group2 = [f'.00{num}' if num < 10 else f'.0{num}' for num in range(1, 70+1)]
codes = group1 + group2
for val in codes:
mask = df['code_mod'].str.contains(val)
new_col = 'service_group_Axx{}'.format(val)
features[new_col] = mask.sum()
return features
def __survival_analysis_features(self, features):
'''Method to make features based on the trained survival models.'''
# Get sociodemographics parameters of the patient
sex = features['sex']
age = features['actual_age']
H = list(range(0, 24+1))
# Make features based on the fitted Kaplan-Meier estimators (Males & Females)
model = self.survival_models['kaplan_meier_both']
s_age = model(age)
for horizon in H:
# Survival risk prediction S(t) at the t = AGE
t = age + horizon/12.
s = model(t)
new_col = f'KaplanMeier_BOTH_S(AGE+{horizon}M)'
features[new_col] = s
# Difference |S(t+horizon) - S(t)|, where t = AGE
ds = s - s_age
new_col = f'KaplanMeier_BOTH_|S(AGE+{horizon}M)-S(AGE)|'
features[new_col] = abs(ds)
# Make sex-oriented features based on the fitted Kaplan-Meier estimators (Males and Females)
model = self.survival_models['kaplan_meier_males'] if sex == 1 else self.survival_models['kaplan_meier_females']
s_age = model(age)
for horizon in H:
# Survival risk prediction S(t) at the t = AGE
t = age + horizon/12.
s = model(t)
new_col = f'KaplanMeier_SEX_S(AGE+{horizon}M)'
features[new_col] = s
# Difference |S(t+horizon) - S(t)|, where t = AGE
ds = s - s_age
new_col = f'KaplanMeier_SEX_|S(AGE+{horizon}M)-S(AGE)|'
features[new_col] = abs(ds)
# Make features based on the trained AFT model
model = self.survival_models['aft']['model']
covariates = self.survival_models['aft']['covariates']
df = pd.DataFrame([{key:features[key] for key in covariates}])
times = [age + val/12. for val in H]
survivals = model.predict_survival_function(df, times=times)
s_age = survivals[0].iloc[0]
for i in range(len(survivals)):
# Survival risk prediction S(t) at the t = AGE
horizon = H[i]
s = survivals[0].iloc[i]
new_col = f'AFT_S(AGE+{horizon}M)'
features[new_col] = s
# Difference |S(t+horizon) - S(t)|, where t = AGE
ds = s - s_age
new_col = f'AFT_|S(AGE+{horizon}M)-S(AGE)|'
features[new_col] = abs(ds)
return features
def feature_engineering(self,
sex: str, # 'F' - FEMALE, 'M' - MALE
birth_date: str, # 'YYYY-MM-DD'
ehr: pd.DataFrame, # Example is located in './EHR/id_26.csv'
date_pred: str, # 'YYYY-MM-DD'
deep_weeks: int # > 0
):
'''
Method to make feature engineering for the Can-Save method.
sex: str # 'F' - FEMALE, 'M' - MALE
birth_date: str # 'YYYY-MM-DD'
ehr: pd.DataFrame # Example is located in './EHR/id_26.csv'
date_pred: str # 'YYYY-MM-DD'
deep_weeks: int # deep_weeks > 0
'''
# Check the presence of the required columns in the EHR
set_of_required_columns = {'date', 'code', 'is_diagnose'}
missing_columns = set_of_required_columns.difference(set(ehr.columns))
if len(missing_columns) > 0:
raise KeyError(f'There are missing columns: {missing_columns}.')
# Prepare dates
ehr['date'] = pd.to_datetime(ehr['date'], format='%Y-%m-%d')
date_pred = pd.to_datetime([date_pred], format='%Y-%m-%d')[0]
date_start = date_pred - pd.DateOffset(weeks=deep_weeks)
# Get sociodemographic parameters
sex = 1 if 'M' in sex else 0
birth_date = pd.to_datetime([birth_date], format='%Y-%m-%d')[0]
# Select medical events in the certain period
mask = (date_start <= ehr['date']) & (ehr['date'] <= date_pred)
ehr = ehr[mask]
# if EHR is empty, then we add a special medical service (placeholder) => EHR is always not empty
if len(ehr) == 0:
ehr.loc[0] = {'date': date_pred, 'code': 'A00.00', 'is_diagnose': 0}
# Form the list of features for risk estimation
days_per_year = (365 * 3 + 366) / 4.
features = {
'sex': sex, # SEX (0 - FEMALE, 1 - MALE)
'actual_age': (date_pred - birth_date).days / days_per_year, # ACTUAL AGE
}
# Make common features
features = self.__common_features(features, ehr, date_start, date_pred)
# Make features adopted to some regional specific characteristics
features = self.__regional_features(features, ehr, date_start, date_pred)
# Make features based on the trained survival models
features = self.__survival_analysis_features(features)
return features
# entry point
if __name__ == '__main__':
# Make new object for feature engineering
config_path = './CONFIG_CanSave.yaml'
cs = CanSave(CONFIG_PATH=config_path)
print(help(cs))
# Load the patient's EHR
path_ehr = './EHR/id_26.csv'
ehr = pd.read_csv(path_ehr, sep=';').set_index('patient_id')
sex = ehr['sex'].iloc[0]
birth_date = ehr['birth_date'].iloc[0]
# Make feature engineering for the risk prediction
features = cs.feature_engineering(
sex = sex, # sex of the patient
birth_date = birth_date, # birth date of the patient
ehr = ehr, # Electronic Health Records of the patient
date_pred = '2022-01-01', # date of the risk estimation
deep_weeks = 108 # deep of the EHR's history (in weeks)
)