| import sys |
| import pickle |
| import numpy as np |
| import pandas as pd |
| import sklearn |
| from quantile_forest import RandomForestQuantileRegressor |
| import mordred |
| import mordred.descriptors |
| import rdkit |
| from rdkit import Chem |
|
|
| QRF_T_list = np.array([25,30,35,37,40,45,50,55,60,65,70,75]) |
| QRF_T_cut = 2.5 |
| df_QRF = pd.read_excel('qrf/db-D-interp-allT-semiclean.xlsx') |
| df_desc = pd.read_excel('qrf/mordred-descriptors.xlsx') |
| calc = mordred.Calculator(mordred.descriptors) |
| colnames_mordred = [str(d) for d in calc.descriptors] |
| df_QRF = pd.merge(df_QRF, df_desc[['Solute_InChIKey',*colnames_mordred]], how='left', on='Solute_InChIKey', suffixes=('', '_dupe')) |
|
|
| def QRF_Ceramic(density, polytg, quantiles=[0.03,0.5,0.97], T=37, worstcase='hi'): |
| nearest_T = QRF_T_list[np.abs(T-QRF_T_list).argmin()] |
| with open(f'qrf/qrf_model_bundle_{int(nearest_T)}.pkl','rb') as f: |
| reg, imp, scaler_X, sub_desc_list = pickle.load(f) |
| |
| |
| mask_T = (df_QRF['T']>nearest_T-QRF_T_cut) & (df_QRF['T']<nearest_T+QRF_T_cut) |
| df_X = df_QRF.loc[mask_T, sub_desc_list] |
| df_y = df_QRF.loc[mask_T, 'LogD'] |
| X_all = imp.transform(df_X) |
| X_all_scale = scaler_X.transform(X_all) |
| |
| if worstcase == 'hi': |
| tmpq = np.array([0.95]*len(sub_desc_list)) |
| tmpq[df_X.corrwith(df_y['LogD'])>0] = 0.05 |
| elif worstcase == 'lo': |
| tmpq = np.array([0.05]*len(sub_desc_list)) |
| tmpq[df_X.corrwith(df_y['LogD'])>0] = 0.95 |
| tmpv = [np.nanquantile(X_all_scale[:,i], q) for i,q in enumerate(tmpq)] |
| tmps = [polytg if n == 'Polymer_Tg' else (density if n == 'Polymer_Density' else 0) for i,n in enumerate(sub_desc_list)] |
| tmps = scaler_X.transform([tmps])[0] |
| tmpv = [tmps[i] if n == 'Polymer_Tg' else (tmps[i] if n == 'Polymer_Density' else tmpv[i]) for i,n in enumerate(sub_desc_list)] |
| LogD_pred = reg.predict([tmpv], quantiles=quantiles) |
| D_pred = 10**LogD_pred |
| if len(D_pred.shape)>1: |
| |
| D_pred = D_pred[0] |
| |
| dij = QRF_DomainExtrap(reg, X_all_scale, np.array([tmpv])) |
| domain_extrap = dij[0] > 0 |
| return D_pred, domain_extrap |
|
|
|
|
| def QRF_Apply(density, polytg, smiles, quantiles=[0.03,0.5,0.97], T=37): |
| """T and Tg in C""" |
| nearest_T = QRF_T_list[np.abs(T-QRF_T_list).argmin()] |
| with open(f'qrf/qrf_model_bundle_{int(nearest_T)}.pkl','rb') as f: |
| reg, imp, scaler_X, sub_desc_list = pickle.load(f) |
| |
| solute_desc_list = sub_desc_list.copy() |
| if 'Polymer_Tg' in solute_desc_list: |
| solute_desc_list.remove('Polymer_Tg') |
| if 'Polymer_Density' in solute_desc_list: |
| solute_desc_list.remove('Polymer_Density') |
| calc_all = mordred.Calculator(mordred.descriptors) |
| desc_strs = [str(d) for d in calc_all.descriptors] |
| desc_funcs = {d1:df for d1 in solute_desc_list for d2,df in zip(desc_strs,calc_all.descriptors) if d1 == d2} |
| calc = mordred.Calculator(list(desc_funcs.values())) |
| m = Chem.MolFromSmiles(smiles) |
| m = Chem.AddHs(m) |
| descs_mordred = calc(m) |
| desc_vals = list(descs_mordred.values()) |
| desc_vals = [d if (isinstance(d,float) or isinstance(d,int)) else np.nan for d in desc_vals] |
| desc_vals = dict(zip(solute_desc_list,desc_vals)) |
| descs = [polytg if n == 'Polymer_Tg' else (density if n == 'Polymer_Density' else desc_vals[n]) for n in sub_desc_list] |
| |
| descs = imp.transform(pd.DataFrame([descs],columns=sub_desc_list)) |
| descs_scale = scaler_X.transform(descs) |
| LogD_pred = reg.predict(descs_scale, quantiles=quantiles) |
| D_pred = 10**LogD_pred |
| if len(D_pred.shape)>1: |
| |
| D_pred = D_pred[0] |
| |
| |
| mask_T = (df_QRF['T']>nearest_T-QRF_T_cut) & (df_QRF['T']<nearest_T+QRF_T_cut) |
| df_X = df_QRF.loc[mask_T, sub_desc_list] |
| X_all = imp.transform(df_X) |
| X_all_scale = scaler_X.transform(X_all) |
| dij = QRF_DomainExtrap(reg, X_all_scale, descs_scale) |
| domain_extrap = dij[0] > 0 |
| return D_pred, domain_extrap |
|
|
| def QRF_DomainExtrap(estimator, X_train, X_test, D=0.0, return_features=False): |
| X_min = X_train.min(axis=0) |
| X_max = X_train.max(axis=0) |
| X_range = X_max-X_min |
| dij = np.zeros(len(X_test)) |
| if return_features: dij_feats = {i:[] for i in range(len(X_test))} |
| for tree in estimator.estimators_: |
| feature = tree.tree_.feature |
| node_indicator = tree.decision_path(X_test) |
| node_indicator = node_indicator.toarray().astype(bool) |
| |
| feats = [feature[r][:-1] for r in node_indicator] |
| mins = [np.array([X_min[i] for i in f]) for f in feats] |
| maxs = [np.array([X_max[i] for i in f]) for f in feats] |
| ranges = [np.array([X_range[i] for i in f]) for f in feats] |
| vals = [X_test[i][feats[i]] for i in range(len(feats))] |
| for i in range(len(X_test)): |
| dmax = (vals[i]-maxs[i])/ranges[i] + D |
| dmin = (mins[i]-vals[i])/ranges[i] + D |
| dmax[dmax<0] = 0 |
| dmin[dmin<0] = 0 |
| dij[i] += np.sum(dmax**2) |
| dij[i] += np.sum(dmin**2) |
| if return_features: |
| if any(dmax>0): |
| dij_feats[i].extend(list(feats[i][dmax>0])) |
| if any(dmin>0): |
| dij_feats[i].extend(list(feats[i][dmin>0])) |
| dij = dij**0.5 |
| if return_features: |
| dij_feats = [list(set(dij_feats[i])) if len(dij_feats[i]) else None for i in range(len(X_test))] |
| return dij, dij_feats |
| else: |
| return dij |
|
|