DAminoMuta / trad_ml.py
auralray's picture
Upload folder using huggingface_hub
acbef3a verified
import os
from copy import deepcopy
from joblib import dump
from dataset import build_peptide_smiles, load_data, process_label
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit import DataStructs
from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import (
average_precision_score, roc_auc_score, f1_score, accuracy_score, mean_absolute_error
)
from scipy.stats import pearsonr, kendalltau
from typing import List, Tuple, Union, Literal, Optional
def encode_sequence(seq):
return Chem.MolFromSmiles(build_peptide_smiles(seq))
def get_data(mode='train', task="cls", include_reverse=False, include_self=False, one_way=False) :
if mode == "train":
loader = load_data
xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', 'train.xlsx')
elif mode in ["test", "r2_case", 'r2_case_']:
one_way = True
if mode in ["test", "r2_case", 'r2_case_']:
loader = load_data
xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', f'{mode}.xlsx')
else:
raise ValueError("未知的 mode,请使用 'train' 或 'test'")
groups_avg = loader(xlsx_file, mode)
data = []
# 针对每个原型,过滤掉长度超过 pad_length 的变种
for orig, variant_dict in groups_avg.items():
# a = len(data)
variants = list(variant_dict.keys())
n_variants = len(variants)
if n_variants == 0:
continue
# 若启用自组合,则添加 (A, A) 样本,标签为 process_label(1, task) → log2(1)=0(再分类也为 0)
if include_self and (not one_way):
for variant in variants:
encoded_seq = encode_sequence(variant)
label = process_label(1.0, task) # log2(1)=0
data.append(((encoded_seq, encoded_seq), label))
# 添加不同变种之间的样本
for i in [0] if one_way else range(n_variants):
for j in range(i + 1, n_variants):
var1 = variants[i]
var2 = variants[j]
mic1 = variant_dict[var1]
mic2 = variant_dict[var2]
# 正向组合: (var1, var2) 标签为 log₂(mic2/mic1)
ratio = mic2 / mic1 if mic1 != 0 else np.nan
label = process_label(ratio, task)
if np.isnan(label):
continue
encoded_var1 = encode_sequence(var1)
encoded_var2 = encode_sequence(var2)
data.append(((encoded_var1, encoded_var2), label))
# 若启用正反组合,则添加 (var2, var1)
if include_reverse and (not one_way):
rev_ratio = mic1 / mic2 if mic2 != 0 else np.nan
rev_label = process_label(rev_ratio, task)
data.append(((encoded_var2, encoded_var1), rev_label))
return data
def _morgan_bitvect_from_mol(mol, radius=2, nBits=2048, useChirality=True):
# bool array of size nBits
gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality)
bv = gen.GetFingerprint(mol) # ExplicitBitVect
arr = np.zeros((nBits,), dtype=np.uint8)
# ConvertToNumpyArray works, but with generator we can use this:
# bv is an ExplicitBitVect; use ToBitString if needed, but converting is fine:
# RDKit still supports DataStructs.ConvertToNumpyArray; however to avoid old API you can do:
onbits = list(bv.GetOnBits())
arr[onbits] = 1
return arr
def _morgan_countvect_hashed_from_mol(mol, radius=2, nBits=2048, useChirality=True):
# dense float32 vector with hashed counts, consistent with previous behavior
gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality)
sp = gen.GetCountFingerprint(mol) # SparseIntVect
vec = np.zeros((nBits,), dtype=np.float32)
for k, v in sp.GetNonzeroElements().items():
# The generator already hashed to [0, fpSize). Keys are within range.
# So no extra Python hash: just place counts directly.
idx = int(k)
if 0 <= idx < nBits:
vec[idx] += float(v)
return vec
class PairwiseMolFeaturizer(BaseEstimator, TransformerMixin):
def __init__(self, nBits=2048, radius=2, useChirality=True, mode="count"):
"""
mode:
- 'count': 使用 Δcount(回归推荐)
- 'count+binary': 使用 [Δcount, XOR(bit)](分类推荐)
"""
self.nBits = nBits
self.radius = radius
self.useChirality = useChirality
self.mode = mode
def fit(self, X, y=None):
return self
def transform(self, X):
feats = []
for (mol1, mol2) in X:
c1 = _morgan_countvect_hashed_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
c2 = _morgan_countvect_hashed_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
delta_count = c2 - c1
if self.mode == "count":
feats.append(delta_count)
elif self.mode == "count+binary":
b1 = _morgan_bitvect_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
b2 = _morgan_bitvect_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
xor = np.logical_xor(b2, b1).astype(np.uint8).astype(np.float32)
feats.append(np.concatenate([delta_count, xor], axis=0))
else:
raise ValueError("Unknown featurization mode")
return np.vstack(feats)
# ----------------- 数据与工具 -----------------
def unpack_pairs(data_list: List[Tuple[Tuple[object, object], Union[int, float]]]):
X = [pair for (pair, y) in data_list]
y = np.array([y for (pair, y) in data_list])
return X, y
def _mean_std(vals):
arr = np.array(vals, dtype=float)
arr = arr[~np.isnan(arr)]
if arr.size == 0:
return np.nan, np.nan
return float(np.mean(arr)), float(np.std(arr))
# ----------------- 主训练-评估入口 -----------------
def run(
get_data_func,
task: Literal['cls', 'reg'],
model: Literal['rf', 'lr'],
n_splits: int = 5,
random_state: int = 42,
nBits: int = 2048,
radius: int = 2,
useChirality: bool = True
):
"""
参数:
- task: 'cls' 或 'reg'
- model: 'rf' 或 'lr'
* 分类: 'rf' -> RandomForestClassifier, 'lr' -> LogisticRegression
* 回归: 'rf' -> RandomForestRegressor, 'lr' -> Ridge
返回:
- 最后一次拟合的 Pipeline 模型
- 指标汇总 dict
"""
data = get_data_func(task=task)
X, y = unpack_pairs(data)
print(f"Total pairs: {len(X)} | Task={task} | Model={model}")
kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)
if task == 'cls':
# 特征
fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count+binary")
# 模型
if model == 'rf':
clf = RandomForestClassifier(
n_estimators=600, max_features="sqrt", min_samples_leaf=1,
class_weight="balanced_subsample", random_state=random_state, n_jobs=-1
)
elif model == 'lr':
clf = LogisticRegression(solver="liblinear", max_iter=2000, class_weight="balanced")
else:
raise ValueError("model must be 'rf' or 'lr'")
pipe = Pipeline([("fx", fx), ("clf", clf)])
auprc_list, auroc_list, f1_list, acc_list, pipes = [], [], [], [], []
for fold, (tr, te) in enumerate(kf.split(X), 1):
X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(int)
X_te = [X[i] for i in te]; y_te = y[te].astype(int)
pipe.fit(X_tr, y_tr)
# 概率输出
if hasattr(pipe.named_steps['clf'], "predict_proba"):
proba = pipe.predict_proba(X_te)[:, 1]
else:
# 如模型不支持 predict_proba,用 decision_function 做一个近似映射
if hasattr(pipe.named_steps['clf'], "decision_function"):
scores = pipe.decision_function(X_te)
# min-max 归一为 [0,1] 近似概率
mn, mx = float(np.min(scores)), float(np.max(scores))
proba = (scores - mn) / (mx - mn + 1e-9)
else:
# 退化为 hard label,不用于 AUC,但勉强计算 AUPRC 会失真
pred_hard = pipe.predict(X_te)
proba = pred_hard.astype(float)
pipes.append(deepcopy(pipe))
pred = (proba >= 0.5).astype(int)
auprc = average_precision_score(y_te, proba)
try:
auroc = roc_auc_score(y_te, proba)
except ValueError:
auroc = np.nan
f1 = f1_score(y_te, pred)
acc = accuracy_score(y_te, pred)
auprc_list.append(auprc); auroc_list.append(auroc); f1_list.append(f1); acc_list.append(acc)
print(f"[Fold {fold}] AUPRC={auprc:.4f} | AUROC={auroc if not np.isnan(auroc) else float('nan'):.4f} | F1={f1:.4f} | ACC={acc:.4f}")
metrics = {
"AUPRC": _mean_std(auprc_list),
"AUROC": _mean_std([v for v in auroc_list if not np.isnan(v)]),
"F1": _mean_std(f1_list),
"ACC": _mean_std(acc_list),
}
print("\nValidation (KFold, mean ± std)")
print(f"- AUPRC: {metrics['AUPRC'][0]:.4f} ± {metrics['AUPRC'][1]:.4f}")
if not np.isnan(metrics["AUROC"][0]):
print(f"- AUROC: {metrics['AUROC'][0]:.4f} ± {metrics['AUROC'][1]:.4f}")
else:
print("- AUROC: undefined (some folds single-class)")
print(f"- F1: {metrics['F1'][0]:.4f} ± {metrics['F1'][1]:.4f}")
print(f"- ACC: {metrics['ACC'][0]:.4f} ± {metrics['ACC'][1]:.4f}")
return pipes, metrics
elif task == 'reg':
fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count")
if model == 'rf':
reg = RandomForestRegressor(
n_estimators=800, max_features="sqrt", min_samples_leaf=1,
random_state=random_state, n_jobs=-1
)
elif model == 'lr':
# 线性基线:Ridge
reg = Ridge(alpha=1.0, random_state=random_state)
else:
raise ValueError("model must be 'rf' or 'lr'")
pipe = Pipeline([("fx", fx), ("reg", reg)])
mae_list, rse_list, pcc_list, kcc_list, pipes = [], [], [], [], []
for fold, (tr, te) in enumerate(kf.split(X), 1):
X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(float)
X_te = [X[i] for i in te]; y_te = y[te].astype(float)
pipe.fit(X_tr, y_tr)
pred = pipe.predict(X_te)
pipes.append(deepcopy(pipe))
mae = mean_absolute_error(y_te, pred)
rss = float(np.sum((y_te - pred) ** 2))
tss = float(np.sum((y_te - np.mean(y_te)) ** 2))
rse = rss / tss if tss > 0 else np.nan
if len(np.unique(y_te)) > 1:
pcc = pearsonr(y_te, pred)[0]
kcc = kendalltau(y_te, pred)[0]
else:
pcc, kcc = np.nan, np.nan
mae_list.append(mae); rse_list.append(rse); pcc_list.append(pcc); kcc_list.append(kcc)
print(f"[Fold {fold}] MAE={mae:.4f} | RSE={rse if not np.isnan(rse) else float('nan'):.4f} | PCC={pcc if not np.isnan(pcc) else float('nan'):.4f} | KCC={kcc if not np.isnan(kcc) else float('nan'):.4f}")
metrics = {
"MAE": _mean_std(mae_list),
"RSE": _mean_std(rse_list),
"PCC": _mean_std([v for v in pcc_list if not np.isnan(v)]),
"KCC": _mean_std([v for v in kcc_list if not np.isnan(v)]),
}
print("\nValidation (KFold, mean ± std)")
print(f"- MAE: {metrics['MAE'][0]:.4f} ± {metrics['MAE'][1]:.4f}")
print(f"- RSE: {metrics['RSE'][0]:.4f} ± {metrics['RSE'][1]:.4f}")
if not np.isnan(metrics["PCC"][0]):
print(f"- PCC: {metrics['PCC'][0]:.4f} ± {metrics['PCC'][1]:.4f}")
else:
print("- PCC: undefined")
if not np.isnan(metrics["KCC"][0]):
print(f"- KCC: {metrics['KCC'][0]:.4f} ± {metrics['KCC'][1]:.4f}")
else:
print("- KCC: undefined")
return pipes, metrics
else:
raise ValueError("task must be 'cls' or 'reg'")
def save_things(models: List[Pipeline], metrics, path):
os.makedirs(path, exist_ok=True)
for i, pipe in enumerate(models):
dump(pipe, os.path.join(path, f"model_{i}.joblib"))
means, stds = [], []
for k, v in metrics.items():
means.append(f"{v[0]}")
stds.append(f"{v[1]}")
means = ','.join(means)
stds = ','.join(stds)
string = '\n'.join(('Valid', means, stds, 'Test', '0.,0.,0.,0.', '0.,0.,0.,0.'))
with open(os.path.join(path, 'result.txt'), 'w') as f:
f.write(string)
# ----------------- 用法示例 -----------------
if __name__ == "__main__":
# 分类 + 逻辑回归
models, metrics = run(get_data, task='cls', model='lr', n_splits=5, random_state=42)
save_things(models, metrics, 'run-cls/logistic_regression')
# 分类 + 随机森林
models, metrics = run(get_data, task='cls', model='rf', n_splits=5, random_state=42)
save_things(models, metrics, 'run-cls/random_forest')
# 回归 + Ridge
models, metrics = run(get_data, task='reg', model='lr', n_splits=5, random_state=42)
save_things(models, metrics, 'run-reg/ridge')
# 回归 + 随机森林
models, metrics = run(get_data, task='reg', model='rf', n_splits=5, random_state=42)
save_things(models, metrics, 'run-reg/random_forest')