CHRIS / qrf /functions.py
Robert Elder
updated quantity module to use category-based sampling where possible
e02ea72
import sys
import pickle
import numpy as np
import pandas as pd
import sklearn
from quantile_forest import RandomForestQuantileRegressor
import mordred
import mordred.descriptors
import rdkit
from rdkit import Chem
QRF_T_list = np.array([25,30,35,37,40,45,50,55,60,65,70,75])
QRF_T_cut = 2.5
df_QRF = pd.read_excel('qrf/db-D-interp-allT-semiclean.xlsx')
df_desc = pd.read_excel('qrf/mordred-descriptors.xlsx')
calc = mordred.Calculator(mordred.descriptors)
colnames_mordred = [str(d) for d in calc.descriptors]
df_QRF = pd.merge(df_QRF, df_desc[['Solute_InChIKey',*colnames_mordred]], how='left', on='Solute_InChIKey', suffixes=('', '_dupe'))
def QRF_Ceramic(density, polytg, quantiles=[0.03,0.5,0.97], T=37, worstcase='hi'):
nearest_T = QRF_T_list[np.abs(T-QRF_T_list).argmin()]
with open(f'qrf/qrf_model_bundle_{int(nearest_T)}.pkl','rb') as f:
reg, imp, scaler_X, sub_desc_list = pickle.load(f)
#df_X = pd.read_excel('qrf/qrf_x.xlsx')
#df_y = pd.read_excel('qrf/qrf_y.xlsx')
mask_T = (df_QRF['T']>nearest_T-QRF_T_cut) & (df_QRF['T']<nearest_T+QRF_T_cut)
df_X = df_QRF.loc[mask_T, sub_desc_list]
df_y = df_QRF.loc[mask_T, 'LogD']
X_all = imp.transform(df_X)
X_all_scale = scaler_X.transform(X_all)
## use "worst-case" solute values
if worstcase == 'hi':
tmpq = np.array([0.95]*len(sub_desc_list))
tmpq[df_X.corrwith(df_y['LogD'])>0] = 0.05 # positive correlations (increase in variable increases D) use low values of variable, negative correlations use high values of variable
elif worstcase == 'lo':
tmpq = np.array([0.05]*len(sub_desc_list))
tmpq[df_X.corrwith(df_y['LogD'])>0] = 0.95 # inverse of above
tmpv = [np.nanquantile(X_all_scale[:,i], q) for i,q in enumerate(tmpq)] # "worst-case" values of scaled descriptors
tmps = [polytg if n == 'Polymer_Tg' else (density if n == 'Polymer_Density' else 0) for i,n in enumerate(sub_desc_list)]
tmps = scaler_X.transform([tmps])[0] # scaled values of polymer descriptors
tmpv = [tmps[i] if n == 'Polymer_Tg' else (tmps[i] if n == 'Polymer_Density' else tmpv[i]) for i,n in enumerate(sub_desc_list)] # merge scaled polymer descriptors with worst-case scaled solute descriptors
LogD_pred = reg.predict([tmpv], quantiles=quantiles)
D_pred = 10**LogD_pred
if len(D_pred.shape)>1:
# return 1D array regardless of quantiles setting
D_pred = D_pred[0]
## domain extrapolation check
dij = QRF_DomainExtrap(reg, X_all_scale, np.array([tmpv]))
domain_extrap = dij[0] > 0
return D_pred, domain_extrap
def QRF_Apply(density, polytg, smiles, quantiles=[0.03,0.5,0.97], T=37):
"""T and Tg in C"""
nearest_T = QRF_T_list[np.abs(T-QRF_T_list).argmin()]
with open(f'qrf/qrf_model_bundle_{int(nearest_T)}.pkl','rb') as f:
reg, imp, scaler_X, sub_desc_list = pickle.load(f)
# get list of descriptors to calculate
solute_desc_list = sub_desc_list.copy()
if 'Polymer_Tg' in solute_desc_list:
solute_desc_list.remove('Polymer_Tg')
if 'Polymer_Density' in solute_desc_list:
solute_desc_list.remove('Polymer_Density')
calc_all = mordred.Calculator(mordred.descriptors)
desc_strs = [str(d) for d in calc_all.descriptors]
desc_funcs = {d1:df for d1 in solute_desc_list for d2,df in zip(desc_strs,calc_all.descriptors) if d1 == d2}
calc = mordred.Calculator(list(desc_funcs.values()))
m = Chem.MolFromSmiles(smiles)
m = Chem.AddHs(m)
descs_mordred = calc(m)
desc_vals = list(descs_mordred.values())
desc_vals = [d if (isinstance(d,float) or isinstance(d,int)) else np.nan for d in desc_vals] # filter out mordred errors
desc_vals = dict(zip(solute_desc_list,desc_vals))
descs = [polytg if n == 'Polymer_Tg' else (density if n == 'Polymer_Density' else desc_vals[n]) for n in sub_desc_list]
#descs = imp.transform([descs])
descs = imp.transform(pd.DataFrame([descs],columns=sub_desc_list)) # avoid warning about missing column names
descs_scale = scaler_X.transform(descs)
LogD_pred = reg.predict(descs_scale, quantiles=quantiles)
D_pred = 10**LogD_pred
if len(D_pred.shape)>1:
# return 1D array regardless of quantiles setting
D_pred = D_pred[0]
## domain extrapolation check
#df_X = pd.read_excel('qrf/qrf_x.xlsx')
mask_T = (df_QRF['T']>nearest_T-QRF_T_cut) & (df_QRF['T']<nearest_T+QRF_T_cut)
df_X = df_QRF.loc[mask_T, sub_desc_list]
X_all = imp.transform(df_X)
X_all_scale = scaler_X.transform(X_all)
dij = QRF_DomainExtrap(reg, X_all_scale, descs_scale)
domain_extrap = dij[0] > 0
return D_pred, domain_extrap
def QRF_DomainExtrap(estimator, X_train, X_test, D=0.0, return_features=False):
X_min = X_train.min(axis=0)
X_max = X_train.max(axis=0)
X_range = X_max-X_min
dij = np.zeros(len(X_test))
if return_features: dij_feats = {i:[] for i in range(len(X_test))}
for tree in estimator.estimators_:
feature = tree.tree_.feature
node_indicator = tree.decision_path(X_test)
node_indicator = node_indicator.toarray().astype(bool)
#node_indicator = node_indicator.A.astype(bool)
feats = [feature[r][:-1] for r in node_indicator]
mins = [np.array([X_min[i] for i in f]) for f in feats]
maxs = [np.array([X_max[i] for i in f]) for f in feats]
ranges = [np.array([X_range[i] for i in f]) for f in feats]
vals = [X_test[i][feats[i]] for i in range(len(feats))]
for i in range(len(X_test)):
dmax = (vals[i]-maxs[i])/ranges[i] + D
dmin = (mins[i]-vals[i])/ranges[i] + D
dmax[dmax<0] = 0
dmin[dmin<0] = 0
dij[i] += np.sum(dmax**2)
dij[i] += np.sum(dmin**2)
if return_features:
if any(dmax>0):
dij_feats[i].extend(list(feats[i][dmax>0]))
if any(dmin>0):
dij_feats[i].extend(list(feats[i][dmin>0]))
dij = dij**0.5
if return_features:
dij_feats = [list(set(dij_feats[i])) if len(dij_feats[i]) else None for i in range(len(X_test))]
return dij, dij_feats
else:
return dij