import os from copy import deepcopy from joblib import dump from dataset import build_peptide_smiles, load_data, process_label from rdkit import Chem from rdkit.Chem import AllChem from rdkit import DataStructs from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator import numpy as np from sklearn.base import BaseEstimator, TransformerMixin from sklearn.pipeline import Pipeline from sklearn.model_selection import KFold from sklearn.linear_model import LogisticRegression, Ridge from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from sklearn.metrics import ( average_precision_score, roc_auc_score, f1_score, accuracy_score, mean_absolute_error ) from scipy.stats import pearsonr, kendalltau from typing import List, Tuple, Union, Literal, Optional def encode_sequence(seq): return Chem.MolFromSmiles(build_peptide_smiles(seq)) def get_data(mode='train', task="cls", include_reverse=False, include_self=False, one_way=False) : if mode == "train": loader = load_data xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', 'train.xlsx') elif mode in ["test", "r2_case", 'r2_case_']: one_way = True if mode in ["test", "r2_case", 'r2_case_']: loader = load_data xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', f'{mode}.xlsx') else: raise ValueError("未知的 mode,请使用 'train' 或 'test'") groups_avg = loader(xlsx_file, mode) data = [] # 针对每个原型,过滤掉长度超过 pad_length 的变种 for orig, variant_dict in groups_avg.items(): # a = len(data) variants = list(variant_dict.keys()) n_variants = len(variants) if n_variants == 0: continue # 若启用自组合,则添加 (A, A) 样本,标签为 process_label(1, task) → log2(1)=0(再分类也为 0) if include_self and (not one_way): for variant in variants: encoded_seq = encode_sequence(variant) label = process_label(1.0, task) # log2(1)=0 data.append(((encoded_seq, encoded_seq), label)) # 添加不同变种之间的样本 for i in [0] if one_way else range(n_variants): for j in range(i + 1, n_variants): var1 = variants[i] var2 = variants[j] mic1 = variant_dict[var1] mic2 = variant_dict[var2] # 正向组合: (var1, var2) 标签为 log₂(mic2/mic1) ratio = mic2 / mic1 if mic1 != 0 else np.nan label = process_label(ratio, task) if np.isnan(label): continue encoded_var1 = encode_sequence(var1) encoded_var2 = encode_sequence(var2) data.append(((encoded_var1, encoded_var2), label)) # 若启用正反组合,则添加 (var2, var1) if include_reverse and (not one_way): rev_ratio = mic1 / mic2 if mic2 != 0 else np.nan rev_label = process_label(rev_ratio, task) data.append(((encoded_var2, encoded_var1), rev_label)) return data def _morgan_bitvect_from_mol(mol, radius=2, nBits=2048, useChirality=True): # bool array of size nBits gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality) bv = gen.GetFingerprint(mol) # ExplicitBitVect arr = np.zeros((nBits,), dtype=np.uint8) # ConvertToNumpyArray works, but with generator we can use this: # bv is an ExplicitBitVect; use ToBitString if needed, but converting is fine: # RDKit still supports DataStructs.ConvertToNumpyArray; however to avoid old API you can do: onbits = list(bv.GetOnBits()) arr[onbits] = 1 return arr def _morgan_countvect_hashed_from_mol(mol, radius=2, nBits=2048, useChirality=True): # dense float32 vector with hashed counts, consistent with previous behavior gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality) sp = gen.GetCountFingerprint(mol) # SparseIntVect vec = np.zeros((nBits,), dtype=np.float32) for k, v in sp.GetNonzeroElements().items(): # The generator already hashed to [0, fpSize). Keys are within range. # So no extra Python hash: just place counts directly. idx = int(k) if 0 <= idx < nBits: vec[idx] += float(v) return vec class PairwiseMolFeaturizer(BaseEstimator, TransformerMixin): def __init__(self, nBits=2048, radius=2, useChirality=True, mode="count"): """ mode: - 'count': 使用 Δcount(回归推荐) - 'count+binary': 使用 [Δcount, XOR(bit)](分类推荐) """ self.nBits = nBits self.radius = radius self.useChirality = useChirality self.mode = mode def fit(self, X, y=None): return self def transform(self, X): feats = [] for (mol1, mol2) in X: c1 = _morgan_countvect_hashed_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) c2 = _morgan_countvect_hashed_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) delta_count = c2 - c1 if self.mode == "count": feats.append(delta_count) elif self.mode == "count+binary": b1 = _morgan_bitvect_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) b2 = _morgan_bitvect_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality) xor = np.logical_xor(b2, b1).astype(np.uint8).astype(np.float32) feats.append(np.concatenate([delta_count, xor], axis=0)) else: raise ValueError("Unknown featurization mode") return np.vstack(feats) # ----------------- 数据与工具 ----------------- def unpack_pairs(data_list: List[Tuple[Tuple[object, object], Union[int, float]]]): X = [pair for (pair, y) in data_list] y = np.array([y for (pair, y) in data_list]) return X, y def _mean_std(vals): arr = np.array(vals, dtype=float) arr = arr[~np.isnan(arr)] if arr.size == 0: return np.nan, np.nan return float(np.mean(arr)), float(np.std(arr)) # ----------------- 主训练-评估入口 ----------------- def run( get_data_func, task: Literal['cls', 'reg'], model: Literal['rf', 'lr'], n_splits: int = 5, random_state: int = 42, nBits: int = 2048, radius: int = 2, useChirality: bool = True ): """ 参数: - task: 'cls' 或 'reg' - model: 'rf' 或 'lr' * 分类: 'rf' -> RandomForestClassifier, 'lr' -> LogisticRegression * 回归: 'rf' -> RandomForestRegressor, 'lr' -> Ridge 返回: - 最后一次拟合的 Pipeline 模型 - 指标汇总 dict """ data = get_data_func(task=task) X, y = unpack_pairs(data) print(f"Total pairs: {len(X)} | Task={task} | Model={model}") kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state) if task == 'cls': # 特征 fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count+binary") # 模型 if model == 'rf': clf = RandomForestClassifier( n_estimators=600, max_features="sqrt", min_samples_leaf=1, class_weight="balanced_subsample", random_state=random_state, n_jobs=-1 ) elif model == 'lr': clf = LogisticRegression(solver="liblinear", max_iter=2000, class_weight="balanced") else: raise ValueError("model must be 'rf' or 'lr'") pipe = Pipeline([("fx", fx), ("clf", clf)]) auprc_list, auroc_list, f1_list, acc_list, pipes = [], [], [], [], [] for fold, (tr, te) in enumerate(kf.split(X), 1): X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(int) X_te = [X[i] for i in te]; y_te = y[te].astype(int) pipe.fit(X_tr, y_tr) # 概率输出 if hasattr(pipe.named_steps['clf'], "predict_proba"): proba = pipe.predict_proba(X_te)[:, 1] else: # 如模型不支持 predict_proba,用 decision_function 做一个近似映射 if hasattr(pipe.named_steps['clf'], "decision_function"): scores = pipe.decision_function(X_te) # min-max 归一为 [0,1] 近似概率 mn, mx = float(np.min(scores)), float(np.max(scores)) proba = (scores - mn) / (mx - mn + 1e-9) else: # 退化为 hard label,不用于 AUC,但勉强计算 AUPRC 会失真 pred_hard = pipe.predict(X_te) proba = pred_hard.astype(float) pipes.append(deepcopy(pipe)) pred = (proba >= 0.5).astype(int) auprc = average_precision_score(y_te, proba) try: auroc = roc_auc_score(y_te, proba) except ValueError: auroc = np.nan f1 = f1_score(y_te, pred) acc = accuracy_score(y_te, pred) auprc_list.append(auprc); auroc_list.append(auroc); f1_list.append(f1); acc_list.append(acc) print(f"[Fold {fold}] AUPRC={auprc:.4f} | AUROC={auroc if not np.isnan(auroc) else float('nan'):.4f} | F1={f1:.4f} | ACC={acc:.4f}") metrics = { "AUPRC": _mean_std(auprc_list), "AUROC": _mean_std([v for v in auroc_list if not np.isnan(v)]), "F1": _mean_std(f1_list), "ACC": _mean_std(acc_list), } print("\nValidation (KFold, mean ± std)") print(f"- AUPRC: {metrics['AUPRC'][0]:.4f} ± {metrics['AUPRC'][1]:.4f}") if not np.isnan(metrics["AUROC"][0]): print(f"- AUROC: {metrics['AUROC'][0]:.4f} ± {metrics['AUROC'][1]:.4f}") else: print("- AUROC: undefined (some folds single-class)") print(f"- F1: {metrics['F1'][0]:.4f} ± {metrics['F1'][1]:.4f}") print(f"- ACC: {metrics['ACC'][0]:.4f} ± {metrics['ACC'][1]:.4f}") return pipes, metrics elif task == 'reg': fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count") if model == 'rf': reg = RandomForestRegressor( n_estimators=800, max_features="sqrt", min_samples_leaf=1, random_state=random_state, n_jobs=-1 ) elif model == 'lr': # 线性基线:Ridge reg = Ridge(alpha=1.0, random_state=random_state) else: raise ValueError("model must be 'rf' or 'lr'") pipe = Pipeline([("fx", fx), ("reg", reg)]) mae_list, rse_list, pcc_list, kcc_list, pipes = [], [], [], [], [] for fold, (tr, te) in enumerate(kf.split(X), 1): X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(float) X_te = [X[i] for i in te]; y_te = y[te].astype(float) pipe.fit(X_tr, y_tr) pred = pipe.predict(X_te) pipes.append(deepcopy(pipe)) mae = mean_absolute_error(y_te, pred) rss = float(np.sum((y_te - pred) ** 2)) tss = float(np.sum((y_te - np.mean(y_te)) ** 2)) rse = rss / tss if tss > 0 else np.nan if len(np.unique(y_te)) > 1: pcc = pearsonr(y_te, pred)[0] kcc = kendalltau(y_te, pred)[0] else: pcc, kcc = np.nan, np.nan mae_list.append(mae); rse_list.append(rse); pcc_list.append(pcc); kcc_list.append(kcc) print(f"[Fold {fold}] MAE={mae:.4f} | RSE={rse if not np.isnan(rse) else float('nan'):.4f} | PCC={pcc if not np.isnan(pcc) else float('nan'):.4f} | KCC={kcc if not np.isnan(kcc) else float('nan'):.4f}") metrics = { "MAE": _mean_std(mae_list), "RSE": _mean_std(rse_list), "PCC": _mean_std([v for v in pcc_list if not np.isnan(v)]), "KCC": _mean_std([v for v in kcc_list if not np.isnan(v)]), } print("\nValidation (KFold, mean ± std)") print(f"- MAE: {metrics['MAE'][0]:.4f} ± {metrics['MAE'][1]:.4f}") print(f"- RSE: {metrics['RSE'][0]:.4f} ± {metrics['RSE'][1]:.4f}") if not np.isnan(metrics["PCC"][0]): print(f"- PCC: {metrics['PCC'][0]:.4f} ± {metrics['PCC'][1]:.4f}") else: print("- PCC: undefined") if not np.isnan(metrics["KCC"][0]): print(f"- KCC: {metrics['KCC'][0]:.4f} ± {metrics['KCC'][1]:.4f}") else: print("- KCC: undefined") return pipes, metrics else: raise ValueError("task must be 'cls' or 'reg'") def save_things(models: List[Pipeline], metrics, path): os.makedirs(path, exist_ok=True) for i, pipe in enumerate(models): dump(pipe, os.path.join(path, f"model_{i}.joblib")) means, stds = [], [] for k, v in metrics.items(): means.append(f"{v[0]}") stds.append(f"{v[1]}") means = ','.join(means) stds = ','.join(stds) string = '\n'.join(('Valid', means, stds, 'Test', '0.,0.,0.,0.', '0.,0.,0.,0.')) with open(os.path.join(path, 'result.txt'), 'w') as f: f.write(string) # ----------------- 用法示例 ----------------- if __name__ == "__main__": # 分类 + 逻辑回归 models, metrics = run(get_data, task='cls', model='lr', n_splits=5, random_state=42) save_things(models, metrics, 'run-cls/logistic_regression') # 分类 + 随机森林 models, metrics = run(get_data, task='cls', model='rf', n_splits=5, random_state=42) save_things(models, metrics, 'run-cls/random_forest') # 回归 + Ridge models, metrics = run(get_data, task='reg', model='lr', n_splits=5, random_state=42) save_things(models, metrics, 'run-reg/ridge') # 回归 + 随机森林 models, metrics = run(get_data, task='reg', model='rf', n_splits=5, random_state=42) save_things(models, metrics, 'run-reg/random_forest')