File size: 6,205 Bytes
d21d254
 
 
 
 
 
 
 
 
 
 
d33329a
 
 
 
 
 
 
 
faa664b
d33329a
 
a1eff75
d33329a
 
 
 
 
a1eff75
 
 
faa664b
 
 
 
 
 
a1eff75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aae0aac
e02ea72
d33329a
 
d21d254
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d33329a
 
 
a1eff75
 
 
 
d21d254
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import sys
import pickle
import numpy as np
import pandas as pd
import sklearn
from quantile_forest import RandomForestQuantileRegressor
import mordred
import mordred.descriptors
import rdkit
from rdkit import Chem

QRF_T_list = np.array([25,30,35,37,40,45,50,55,60,65,70,75])
QRF_T_cut = 2.5
df_QRF = pd.read_excel('qrf/db-D-interp-allT-semiclean.xlsx')
df_desc =  pd.read_excel('qrf/mordred-descriptors.xlsx')
calc = mordred.Calculator(mordred.descriptors)
colnames_mordred = [str(d) for d in calc.descriptors]
df_QRF = pd.merge(df_QRF, df_desc[['Solute_InChIKey',*colnames_mordred]], how='left', on='Solute_InChIKey', suffixes=('', '_dupe'))

def QRF_Ceramic(density, polytg, quantiles=[0.03,0.5,0.97], T=37, worstcase='hi'):
    nearest_T = QRF_T_list[np.abs(T-QRF_T_list).argmin()]
    with open(f'qrf/qrf_model_bundle_{int(nearest_T)}.pkl','rb') as f:
        reg, imp, scaler_X, sub_desc_list = pickle.load(f)
    #df_X = pd.read_excel('qrf/qrf_x.xlsx')
    #df_y = pd.read_excel('qrf/qrf_y.xlsx')
    mask_T = (df_QRF['T']>nearest_T-QRF_T_cut) & (df_QRF['T']<nearest_T+QRF_T_cut)
    df_X = df_QRF.loc[mask_T, sub_desc_list]
    df_y = df_QRF.loc[mask_T, 'LogD']
    X_all = imp.transform(df_X)
    X_all_scale = scaler_X.transform(X_all)
    ## use "worst-case" solute values
    if worstcase == 'hi':
        tmpq = np.array([0.95]*len(sub_desc_list))
        tmpq[df_X.corrwith(df_y['LogD'])>0] = 0.05 # positive correlations (increase in variable increases D) use low values of variable, negative correlations use high values of variable
    elif worstcase == 'lo':
        tmpq = np.array([0.05]*len(sub_desc_list))
        tmpq[df_X.corrwith(df_y['LogD'])>0] = 0.95 # inverse of above
    tmpv = [np.nanquantile(X_all_scale[:,i], q) for i,q in enumerate(tmpq)] # "worst-case" values of scaled descriptors
    tmps = [polytg if n == 'Polymer_Tg' else (density if n == 'Polymer_Density' else 0) for i,n in enumerate(sub_desc_list)]
    tmps = scaler_X.transform([tmps])[0] # scaled values of polymer descriptors
    tmpv = [tmps[i] if n == 'Polymer_Tg' else (tmps[i] if n == 'Polymer_Density' else tmpv[i]) for i,n in enumerate(sub_desc_list)] # merge scaled polymer descriptors with worst-case scaled solute descriptors
    LogD_pred = reg.predict([tmpv], quantiles=quantiles)
    D_pred = 10**LogD_pred
    if len(D_pred.shape)>1:
        # return 1D array regardless of quantiles setting
        D_pred = D_pred[0]
    ## domain extrapolation check
    dij = QRF_DomainExtrap(reg, X_all_scale, np.array([tmpv]))
    domain_extrap = dij[0] > 0
    return D_pred, domain_extrap


def QRF_Apply(density, polytg, smiles, quantiles=[0.03,0.5,0.97], T=37):
    """T and Tg in C"""
    nearest_T = QRF_T_list[np.abs(T-QRF_T_list).argmin()]
    with open(f'qrf/qrf_model_bundle_{int(nearest_T)}.pkl','rb') as f:
        reg, imp, scaler_X, sub_desc_list = pickle.load(f)
    # get list of descriptors to calculate
    solute_desc_list = sub_desc_list.copy()
    if 'Polymer_Tg' in solute_desc_list:
        solute_desc_list.remove('Polymer_Tg')
    if 'Polymer_Density' in solute_desc_list:
        solute_desc_list.remove('Polymer_Density')
    calc_all = mordred.Calculator(mordred.descriptors)
    desc_strs = [str(d) for d in calc_all.descriptors]
    desc_funcs = {d1:df for d1 in solute_desc_list for d2,df in zip(desc_strs,calc_all.descriptors) if d1 == d2}
    calc = mordred.Calculator(list(desc_funcs.values()))
    m = Chem.MolFromSmiles(smiles)
    m = Chem.AddHs(m)
    descs_mordred = calc(m)
    desc_vals = list(descs_mordred.values())
    desc_vals = [d if (isinstance(d,float) or isinstance(d,int)) else np.nan for d in desc_vals] # filter out mordred errors
    desc_vals = dict(zip(solute_desc_list,desc_vals))
    descs = [polytg if n == 'Polymer_Tg' else (density if n == 'Polymer_Density' else desc_vals[n]) for n in sub_desc_list]
    #descs = imp.transform([descs])
    descs = imp.transform(pd.DataFrame([descs],columns=sub_desc_list)) # avoid warning about missing column names
    descs_scale = scaler_X.transform(descs)
    LogD_pred = reg.predict(descs_scale, quantiles=quantiles)
    D_pred = 10**LogD_pred
    if len(D_pred.shape)>1:
        # return 1D array regardless of quantiles setting
        D_pred = D_pred[0]
    ## domain extrapolation check
    #df_X = pd.read_excel('qrf/qrf_x.xlsx')
    mask_T = (df_QRF['T']>nearest_T-QRF_T_cut) & (df_QRF['T']<nearest_T+QRF_T_cut)
    df_X = df_QRF.loc[mask_T, sub_desc_list]
    X_all = imp.transform(df_X)
    X_all_scale = scaler_X.transform(X_all)
    dij = QRF_DomainExtrap(reg, X_all_scale, descs_scale)
    domain_extrap = dij[0] > 0
    return D_pred, domain_extrap

def QRF_DomainExtrap(estimator, X_train, X_test, D=0.0, return_features=False):
    X_min = X_train.min(axis=0)
    X_max = X_train.max(axis=0)
    X_range = X_max-X_min
    dij = np.zeros(len(X_test))
    if return_features: dij_feats = {i:[] for i in range(len(X_test))}
    for tree in estimator.estimators_:
        feature = tree.tree_.feature
        node_indicator = tree.decision_path(X_test)
        node_indicator = node_indicator.toarray().astype(bool)
        #node_indicator = node_indicator.A.astype(bool)
        feats = [feature[r][:-1] for r in node_indicator]
        mins = [np.array([X_min[i] for i in f]) for f in feats]
        maxs = [np.array([X_max[i] for i in f]) for f in feats]
        ranges = [np.array([X_range[i] for i in f]) for f in feats]
        vals = [X_test[i][feats[i]] for i in range(len(feats))]
        for i in range(len(X_test)):
            dmax = (vals[i]-maxs[i])/ranges[i] + D
            dmin = (mins[i]-vals[i])/ranges[i] + D
            dmax[dmax<0] = 0
            dmin[dmin<0] = 0
            dij[i] += np.sum(dmax**2)
            dij[i] += np.sum(dmin**2)
            if return_features:
                if any(dmax>0):
                    dij_feats[i].extend(list(feats[i][dmax>0]))
                if any(dmin>0):
                    dij_feats[i].extend(list(feats[i][dmin>0]))
    dij = dij**0.5
    if return_features:
        dij_feats = [list(set(dij_feats[i])) if len(dij_feats[i]) else None for i in range(len(X_test))]
        return dij, dij_feats
    else:
        return dij