|
|
import os |
|
|
from copy import deepcopy |
|
|
from joblib import dump |
|
|
from dataset import build_peptide_smiles, load_data, process_label |
|
|
from rdkit import Chem |
|
|
from rdkit.Chem import AllChem |
|
|
from rdkit import DataStructs |
|
|
from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator |
|
|
import numpy as np |
|
|
from sklearn.base import BaseEstimator, TransformerMixin |
|
|
from sklearn.pipeline import Pipeline |
|
|
from sklearn.model_selection import KFold |
|
|
from sklearn.linear_model import LogisticRegression, Ridge |
|
|
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor |
|
|
from sklearn.metrics import ( |
|
|
average_precision_score, roc_auc_score, f1_score, accuracy_score, mean_absolute_error |
|
|
) |
|
|
from scipy.stats import pearsonr, kendalltau |
|
|
from typing import List, Tuple, Union, Literal, Optional |
|
|
|
|
|
|
|
|
def encode_sequence(seq): |
|
|
return Chem.MolFromSmiles(build_peptide_smiles(seq)) |
|
|
|
|
|
|
|
|
def get_data(mode='train', task="cls", include_reverse=False, include_self=False, one_way=False) : |
|
|
if mode == "train": |
|
|
loader = load_data |
|
|
xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', 'train.xlsx') |
|
|
elif mode in ["test", "r2_case", 'r2_case_']: |
|
|
one_way = True |
|
|
if mode in ["test", "r2_case", 'r2_case_']: |
|
|
loader = load_data |
|
|
xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', f'{mode}.xlsx') |
|
|
else: |
|
|
raise ValueError("未知的 mode,请使用 'train' 或 'test'") |
|
|
|
|
|
groups_avg = loader(xlsx_file, mode) |
|
|
|
|
|
data = [] |
|
|
|
|
|
for orig, variant_dict in groups_avg.items(): |
|
|
|
|
|
variants = list(variant_dict.keys()) |
|
|
n_variants = len(variants) |
|
|
if n_variants == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
if include_self and (not one_way): |
|
|
for variant in variants: |
|
|
encoded_seq = encode_sequence(variant) |
|
|
label = process_label(1.0, task) |
|
|
data.append(((encoded_seq, encoded_seq), label)) |
|
|
|
|
|
|
|
|
for i in [0] if one_way else range(n_variants): |
|
|
for j in range(i + 1, n_variants): |
|
|
var1 = variants[i] |
|
|
var2 = variants[j] |
|
|
mic1 = variant_dict[var1] |
|
|
mic2 = variant_dict[var2] |
|
|
|
|
|
|
|
|
ratio = mic2 / mic1 if mic1 != 0 else np.nan |
|
|
label = process_label(ratio, task) |
|
|
if np.isnan(label): |
|
|
continue |
|
|
encoded_var1 = encode_sequence(var1) |
|
|
encoded_var2 = encode_sequence(var2) |
|
|
data.append(((encoded_var1, encoded_var2), label)) |
|
|
|
|
|
|
|
|
if include_reverse and (not one_way): |
|
|
rev_ratio = mic1 / mic2 if mic2 != 0 else np.nan |
|
|
rev_label = process_label(rev_ratio, task) |
|
|
data.append(((encoded_var2, encoded_var1), rev_label)) |
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
def _morgan_bitvect_from_mol(mol, radius=2, nBits=2048, useChirality=True): |
|
|
|
|
|
gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality) |
|
|
bv = gen.GetFingerprint(mol) |
|
|
arr = np.zeros((nBits,), dtype=np.uint8) |
|
|
|
|
|
|
|
|
|
|
|
onbits = list(bv.GetOnBits()) |
|
|
arr[onbits] = 1 |
|
|
return arr |
|
|
|
|
|
def _morgan_countvect_hashed_from_mol(mol, radius=2, nBits=2048, useChirality=True): |
|
|
|
|
|
gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality) |
|
|
sp = gen.GetCountFingerprint(mol) |
|
|
vec = np.zeros((nBits,), dtype=np.float32) |
|
|
for k, v in sp.GetNonzeroElements().items(): |
|
|
|
|
|
|
|
|
idx = int(k) |
|
|
if 0 <= idx < nBits: |
|
|
vec[idx] += float(v) |
|
|
return vec |
|
|
|
|
|
class PairwiseMolFeaturizer(BaseEstimator, TransformerMixin): |
|
|
def __init__(self, nBits=2048, radius=2, useChirality=True, mode="count"): |
|
|
""" |
|
|
mode: |
|
|
- 'count': 使用 Δcount(回归推荐) |
|
|
- 'count+binary': 使用 [Δcount, XOR(bit)](分类推荐) |
|
|
""" |
|
|
self.nBits = nBits |
|
|
self.radius = radius |
|
|
self.useChirality = useChirality |
|
|
self.mode = mode |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
feats = [] |
|
|
for (mol1, mol2) in X: |
|
|
c1 = _morgan_countvect_hashed_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) |
|
|
c2 = _morgan_countvect_hashed_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) |
|
|
delta_count = c2 - c1 |
|
|
if self.mode == "count": |
|
|
feats.append(delta_count) |
|
|
elif self.mode == "count+binary": |
|
|
b1 = _morgan_bitvect_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) |
|
|
b2 = _morgan_bitvect_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) |
|
|
xor = np.logical_xor(b2, b1).astype(np.uint8).astype(np.float32) |
|
|
feats.append(np.concatenate([delta_count, xor], axis=0)) |
|
|
else: |
|
|
raise ValueError("Unknown featurization mode") |
|
|
return np.vstack(feats) |
|
|
|
|
|
|
|
|
def unpack_pairs(data_list: List[Tuple[Tuple[object, object], Union[int, float]]]): |
|
|
X = [pair for (pair, y) in data_list] |
|
|
y = np.array([y for (pair, y) in data_list]) |
|
|
return X, y |
|
|
|
|
|
def _mean_std(vals): |
|
|
arr = np.array(vals, dtype=float) |
|
|
arr = arr[~np.isnan(arr)] |
|
|
if arr.size == 0: |
|
|
return np.nan, np.nan |
|
|
return float(np.mean(arr)), float(np.std(arr)) |
|
|
|
|
|
|
|
|
def run( |
|
|
get_data_func, |
|
|
task: Literal['cls', 'reg'], |
|
|
model: Literal['rf', 'lr'], |
|
|
n_splits: int = 5, |
|
|
random_state: int = 42, |
|
|
nBits: int = 2048, |
|
|
radius: int = 2, |
|
|
useChirality: bool = True |
|
|
): |
|
|
""" |
|
|
参数: |
|
|
- task: 'cls' 或 'reg' |
|
|
- model: 'rf' 或 'lr' |
|
|
* 分类: 'rf' -> RandomForestClassifier, 'lr' -> LogisticRegression |
|
|
* 回归: 'rf' -> RandomForestRegressor, 'lr' -> Ridge |
|
|
返回: |
|
|
- 最后一次拟合的 Pipeline 模型 |
|
|
- 指标汇总 dict |
|
|
""" |
|
|
data = get_data_func(task=task) |
|
|
X, y = unpack_pairs(data) |
|
|
print(f"Total pairs: {len(X)} | Task={task} | Model={model}") |
|
|
|
|
|
kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state) |
|
|
|
|
|
if task == 'cls': |
|
|
|
|
|
fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count+binary") |
|
|
|
|
|
if model == 'rf': |
|
|
clf = RandomForestClassifier( |
|
|
n_estimators=600, max_features="sqrt", min_samples_leaf=1, |
|
|
class_weight="balanced_subsample", random_state=random_state, n_jobs=-1 |
|
|
) |
|
|
elif model == 'lr': |
|
|
clf = LogisticRegression(solver="liblinear", max_iter=2000, class_weight="balanced") |
|
|
else: |
|
|
raise ValueError("model must be 'rf' or 'lr'") |
|
|
|
|
|
pipe = Pipeline([("fx", fx), ("clf", clf)]) |
|
|
|
|
|
auprc_list, auroc_list, f1_list, acc_list, pipes = [], [], [], [], [] |
|
|
|
|
|
for fold, (tr, te) in enumerate(kf.split(X), 1): |
|
|
X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(int) |
|
|
X_te = [X[i] for i in te]; y_te = y[te].astype(int) |
|
|
|
|
|
pipe.fit(X_tr, y_tr) |
|
|
|
|
|
if hasattr(pipe.named_steps['clf'], "predict_proba"): |
|
|
proba = pipe.predict_proba(X_te)[:, 1] |
|
|
else: |
|
|
|
|
|
if hasattr(pipe.named_steps['clf'], "decision_function"): |
|
|
scores = pipe.decision_function(X_te) |
|
|
|
|
|
mn, mx = float(np.min(scores)), float(np.max(scores)) |
|
|
proba = (scores - mn) / (mx - mn + 1e-9) |
|
|
else: |
|
|
|
|
|
pred_hard = pipe.predict(X_te) |
|
|
proba = pred_hard.astype(float) |
|
|
|
|
|
pipes.append(deepcopy(pipe)) |
|
|
|
|
|
pred = (proba >= 0.5).astype(int) |
|
|
|
|
|
auprc = average_precision_score(y_te, proba) |
|
|
try: |
|
|
auroc = roc_auc_score(y_te, proba) |
|
|
except ValueError: |
|
|
auroc = np.nan |
|
|
f1 = f1_score(y_te, pred) |
|
|
acc = accuracy_score(y_te, pred) |
|
|
|
|
|
auprc_list.append(auprc); auroc_list.append(auroc); f1_list.append(f1); acc_list.append(acc) |
|
|
print(f"[Fold {fold}] AUPRC={auprc:.4f} | AUROC={auroc if not np.isnan(auroc) else float('nan'):.4f} | F1={f1:.4f} | ACC={acc:.4f}") |
|
|
|
|
|
metrics = { |
|
|
"AUPRC": _mean_std(auprc_list), |
|
|
"AUROC": _mean_std([v for v in auroc_list if not np.isnan(v)]), |
|
|
"F1": _mean_std(f1_list), |
|
|
"ACC": _mean_std(acc_list), |
|
|
} |
|
|
|
|
|
print("\nValidation (KFold, mean ± std)") |
|
|
print(f"- AUPRC: {metrics['AUPRC'][0]:.4f} ± {metrics['AUPRC'][1]:.4f}") |
|
|
if not np.isnan(metrics["AUROC"][0]): |
|
|
print(f"- AUROC: {metrics['AUROC'][0]:.4f} ± {metrics['AUROC'][1]:.4f}") |
|
|
else: |
|
|
print("- AUROC: undefined (some folds single-class)") |
|
|
print(f"- F1: {metrics['F1'][0]:.4f} ± {metrics['F1'][1]:.4f}") |
|
|
print(f"- ACC: {metrics['ACC'][0]:.4f} ± {metrics['ACC'][1]:.4f}") |
|
|
|
|
|
return pipes, metrics |
|
|
|
|
|
elif task == 'reg': |
|
|
fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count") |
|
|
|
|
|
if model == 'rf': |
|
|
reg = RandomForestRegressor( |
|
|
n_estimators=800, max_features="sqrt", min_samples_leaf=1, |
|
|
random_state=random_state, n_jobs=-1 |
|
|
) |
|
|
elif model == 'lr': |
|
|
|
|
|
reg = Ridge(alpha=1.0, random_state=random_state) |
|
|
else: |
|
|
raise ValueError("model must be 'rf' or 'lr'") |
|
|
|
|
|
pipe = Pipeline([("fx", fx), ("reg", reg)]) |
|
|
|
|
|
mae_list, rse_list, pcc_list, kcc_list, pipes = [], [], [], [], [] |
|
|
|
|
|
for fold, (tr, te) in enumerate(kf.split(X), 1): |
|
|
X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(float) |
|
|
X_te = [X[i] for i in te]; y_te = y[te].astype(float) |
|
|
|
|
|
pipe.fit(X_tr, y_tr) |
|
|
pred = pipe.predict(X_te) |
|
|
|
|
|
pipes.append(deepcopy(pipe)) |
|
|
|
|
|
mae = mean_absolute_error(y_te, pred) |
|
|
rss = float(np.sum((y_te - pred) ** 2)) |
|
|
tss = float(np.sum((y_te - np.mean(y_te)) ** 2)) |
|
|
rse = rss / tss if tss > 0 else np.nan |
|
|
|
|
|
if len(np.unique(y_te)) > 1: |
|
|
pcc = pearsonr(y_te, pred)[0] |
|
|
kcc = kendalltau(y_te, pred)[0] |
|
|
else: |
|
|
pcc, kcc = np.nan, np.nan |
|
|
|
|
|
mae_list.append(mae); rse_list.append(rse); pcc_list.append(pcc); kcc_list.append(kcc) |
|
|
print(f"[Fold {fold}] MAE={mae:.4f} | RSE={rse if not np.isnan(rse) else float('nan'):.4f} | PCC={pcc if not np.isnan(pcc) else float('nan'):.4f} | KCC={kcc if not np.isnan(kcc) else float('nan'):.4f}") |
|
|
|
|
|
metrics = { |
|
|
"MAE": _mean_std(mae_list), |
|
|
"RSE": _mean_std(rse_list), |
|
|
"PCC": _mean_std([v for v in pcc_list if not np.isnan(v)]), |
|
|
"KCC": _mean_std([v for v in kcc_list if not np.isnan(v)]), |
|
|
} |
|
|
|
|
|
print("\nValidation (KFold, mean ± std)") |
|
|
print(f"- MAE: {metrics['MAE'][0]:.4f} ± {metrics['MAE'][1]:.4f}") |
|
|
print(f"- RSE: {metrics['RSE'][0]:.4f} ± {metrics['RSE'][1]:.4f}") |
|
|
if not np.isnan(metrics["PCC"][0]): |
|
|
print(f"- PCC: {metrics['PCC'][0]:.4f} ± {metrics['PCC'][1]:.4f}") |
|
|
else: |
|
|
print("- PCC: undefined") |
|
|
if not np.isnan(metrics["KCC"][0]): |
|
|
print(f"- KCC: {metrics['KCC'][0]:.4f} ± {metrics['KCC'][1]:.4f}") |
|
|
else: |
|
|
print("- KCC: undefined") |
|
|
|
|
|
return pipes, metrics |
|
|
|
|
|
else: |
|
|
raise ValueError("task must be 'cls' or 'reg'") |
|
|
|
|
|
|
|
|
def save_things(models: List[Pipeline], metrics, path): |
|
|
os.makedirs(path, exist_ok=True) |
|
|
for i, pipe in enumerate(models): |
|
|
dump(pipe, os.path.join(path, f"model_{i}.joblib")) |
|
|
means, stds = [], [] |
|
|
for k, v in metrics.items(): |
|
|
means.append(f"{v[0]}") |
|
|
stds.append(f"{v[1]}") |
|
|
means = ','.join(means) |
|
|
stds = ','.join(stds) |
|
|
string = '\n'.join(('Valid', means, stds, 'Test', '0.,0.,0.,0.', '0.,0.,0.,0.')) |
|
|
with open(os.path.join(path, 'result.txt'), 'w') as f: |
|
|
f.write(string) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
models, metrics = run(get_data, task='cls', model='lr', n_splits=5, random_state=42) |
|
|
save_things(models, metrics, 'run-cls/logistic_regression') |
|
|
|
|
|
|
|
|
models, metrics = run(get_data, task='cls', model='rf', n_splits=5, random_state=42) |
|
|
save_things(models, metrics, 'run-cls/random_forest') |
|
|
|
|
|
|
|
|
models, metrics = run(get_data, task='reg', model='lr', n_splits=5, random_state=42) |
|
|
save_things(models, metrics, 'run-reg/ridge') |
|
|
|
|
|
|
|
|
models, metrics = run(get_data, task='reg', model='rf', n_splits=5, random_state=42) |
|
|
save_things(models, metrics, 'run-reg/random_forest') |