File size: 14,326 Bytes
acbef3a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 | import os
from copy import deepcopy
from joblib import dump
from dataset import build_peptide_smiles, load_data, process_label
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit import DataStructs
from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import (
average_precision_score, roc_auc_score, f1_score, accuracy_score, mean_absolute_error
)
from scipy.stats import pearsonr, kendalltau
from typing import List, Tuple, Union, Literal, Optional
def encode_sequence(seq):
return Chem.MolFromSmiles(build_peptide_smiles(seq))
def get_data(mode='train', task="cls", include_reverse=False, include_self=False, one_way=False) :
if mode == "train":
loader = load_data
xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', 'train.xlsx')
elif mode in ["test", "r2_case", 'r2_case_']:
one_way = True
if mode in ["test", "r2_case", 'r2_case_']:
loader = load_data
xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', f'{mode}.xlsx')
else:
raise ValueError("未知的 mode,请使用 'train' 或 'test'")
groups_avg = loader(xlsx_file, mode)
data = []
# 针对每个原型,过滤掉长度超过 pad_length 的变种
for orig, variant_dict in groups_avg.items():
# a = len(data)
variants = list(variant_dict.keys())
n_variants = len(variants)
if n_variants == 0:
continue
# 若启用自组合,则添加 (A, A) 样本,标签为 process_label(1, task) → log2(1)=0(再分类也为 0)
if include_self and (not one_way):
for variant in variants:
encoded_seq = encode_sequence(variant)
label = process_label(1.0, task) # log2(1)=0
data.append(((encoded_seq, encoded_seq), label))
# 添加不同变种之间的样本
for i in [0] if one_way else range(n_variants):
for j in range(i + 1, n_variants):
var1 = variants[i]
var2 = variants[j]
mic1 = variant_dict[var1]
mic2 = variant_dict[var2]
# 正向组合: (var1, var2) 标签为 log₂(mic2/mic1)
ratio = mic2 / mic1 if mic1 != 0 else np.nan
label = process_label(ratio, task)
if np.isnan(label):
continue
encoded_var1 = encode_sequence(var1)
encoded_var2 = encode_sequence(var2)
data.append(((encoded_var1, encoded_var2), label))
# 若启用正反组合,则添加 (var2, var1)
if include_reverse and (not one_way):
rev_ratio = mic1 / mic2 if mic2 != 0 else np.nan
rev_label = process_label(rev_ratio, task)
data.append(((encoded_var2, encoded_var1), rev_label))
return data
def _morgan_bitvect_from_mol(mol, radius=2, nBits=2048, useChirality=True):
# bool array of size nBits
gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality)
bv = gen.GetFingerprint(mol) # ExplicitBitVect
arr = np.zeros((nBits,), dtype=np.uint8)
# ConvertToNumpyArray works, but with generator we can use this:
# bv is an ExplicitBitVect; use ToBitString if needed, but converting is fine:
# RDKit still supports DataStructs.ConvertToNumpyArray; however to avoid old API you can do:
onbits = list(bv.GetOnBits())
arr[onbits] = 1
return arr
def _morgan_countvect_hashed_from_mol(mol, radius=2, nBits=2048, useChirality=True):
# dense float32 vector with hashed counts, consistent with previous behavior
gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality)
sp = gen.GetCountFingerprint(mol) # SparseIntVect
vec = np.zeros((nBits,), dtype=np.float32)
for k, v in sp.GetNonzeroElements().items():
# The generator already hashed to [0, fpSize). Keys are within range.
# So no extra Python hash: just place counts directly.
idx = int(k)
if 0 <= idx < nBits:
vec[idx] += float(v)
return vec
class PairwiseMolFeaturizer(BaseEstimator, TransformerMixin):
def __init__(self, nBits=2048, radius=2, useChirality=True, mode="count"):
"""
mode:
- 'count': 使用 Δcount(回归推荐)
- 'count+binary': 使用 [Δcount, XOR(bit)](分类推荐)
"""
self.nBits = nBits
self.radius = radius
self.useChirality = useChirality
self.mode = mode
def fit(self, X, y=None):
return self
def transform(self, X):
feats = []
for (mol1, mol2) in X:
c1 = _morgan_countvect_hashed_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
c2 = _morgan_countvect_hashed_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
delta_count = c2 - c1
if self.mode == "count":
feats.append(delta_count)
elif self.mode == "count+binary":
b1 = _morgan_bitvect_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
b2 = _morgan_bitvect_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
xor = np.logical_xor(b2, b1).astype(np.uint8).astype(np.float32)
feats.append(np.concatenate([delta_count, xor], axis=0))
else:
raise ValueError("Unknown featurization mode")
return np.vstack(feats)
# ----------------- 数据与工具 -----------------
def unpack_pairs(data_list: List[Tuple[Tuple[object, object], Union[int, float]]]):
X = [pair for (pair, y) in data_list]
y = np.array([y for (pair, y) in data_list])
return X, y
def _mean_std(vals):
arr = np.array(vals, dtype=float)
arr = arr[~np.isnan(arr)]
if arr.size == 0:
return np.nan, np.nan
return float(np.mean(arr)), float(np.std(arr))
# ----------------- 主训练-评估入口 -----------------
def run(
get_data_func,
task: Literal['cls', 'reg'],
model: Literal['rf', 'lr'],
n_splits: int = 5,
random_state: int = 42,
nBits: int = 2048,
radius: int = 2,
useChirality: bool = True
):
"""
参数:
- task: 'cls' 或 'reg'
- model: 'rf' 或 'lr'
* 分类: 'rf' -> RandomForestClassifier, 'lr' -> LogisticRegression
* 回归: 'rf' -> RandomForestRegressor, 'lr' -> Ridge
返回:
- 最后一次拟合的 Pipeline 模型
- 指标汇总 dict
"""
data = get_data_func(task=task)
X, y = unpack_pairs(data)
print(f"Total pairs: {len(X)} | Task={task} | Model={model}")
kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)
if task == 'cls':
# 特征
fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count+binary")
# 模型
if model == 'rf':
clf = RandomForestClassifier(
n_estimators=600, max_features="sqrt", min_samples_leaf=1,
class_weight="balanced_subsample", random_state=random_state, n_jobs=-1
)
elif model == 'lr':
clf = LogisticRegression(solver="liblinear", max_iter=2000, class_weight="balanced")
else:
raise ValueError("model must be 'rf' or 'lr'")
pipe = Pipeline([("fx", fx), ("clf", clf)])
auprc_list, auroc_list, f1_list, acc_list, pipes = [], [], [], [], []
for fold, (tr, te) in enumerate(kf.split(X), 1):
X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(int)
X_te = [X[i] for i in te]; y_te = y[te].astype(int)
pipe.fit(X_tr, y_tr)
# 概率输出
if hasattr(pipe.named_steps['clf'], "predict_proba"):
proba = pipe.predict_proba(X_te)[:, 1]
else:
# 如模型不支持 predict_proba,用 decision_function 做一个近似映射
if hasattr(pipe.named_steps['clf'], "decision_function"):
scores = pipe.decision_function(X_te)
# min-max 归一为 [0,1] 近似概率
mn, mx = float(np.min(scores)), float(np.max(scores))
proba = (scores - mn) / (mx - mn + 1e-9)
else:
# 退化为 hard label,不用于 AUC,但勉强计算 AUPRC 会失真
pred_hard = pipe.predict(X_te)
proba = pred_hard.astype(float)
pipes.append(deepcopy(pipe))
pred = (proba >= 0.5).astype(int)
auprc = average_precision_score(y_te, proba)
try:
auroc = roc_auc_score(y_te, proba)
except ValueError:
auroc = np.nan
f1 = f1_score(y_te, pred)
acc = accuracy_score(y_te, pred)
auprc_list.append(auprc); auroc_list.append(auroc); f1_list.append(f1); acc_list.append(acc)
print(f"[Fold {fold}] AUPRC={auprc:.4f} | AUROC={auroc if not np.isnan(auroc) else float('nan'):.4f} | F1={f1:.4f} | ACC={acc:.4f}")
metrics = {
"AUPRC": _mean_std(auprc_list),
"AUROC": _mean_std([v for v in auroc_list if not np.isnan(v)]),
"F1": _mean_std(f1_list),
"ACC": _mean_std(acc_list),
}
print("\nValidation (KFold, mean ± std)")
print(f"- AUPRC: {metrics['AUPRC'][0]:.4f} ± {metrics['AUPRC'][1]:.4f}")
if not np.isnan(metrics["AUROC"][0]):
print(f"- AUROC: {metrics['AUROC'][0]:.4f} ± {metrics['AUROC'][1]:.4f}")
else:
print("- AUROC: undefined (some folds single-class)")
print(f"- F1: {metrics['F1'][0]:.4f} ± {metrics['F1'][1]:.4f}")
print(f"- ACC: {metrics['ACC'][0]:.4f} ± {metrics['ACC'][1]:.4f}")
return pipes, metrics
elif task == 'reg':
fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count")
if model == 'rf':
reg = RandomForestRegressor(
n_estimators=800, max_features="sqrt", min_samples_leaf=1,
random_state=random_state, n_jobs=-1
)
elif model == 'lr':
# 线性基线:Ridge
reg = Ridge(alpha=1.0, random_state=random_state)
else:
raise ValueError("model must be 'rf' or 'lr'")
pipe = Pipeline([("fx", fx), ("reg", reg)])
mae_list, rse_list, pcc_list, kcc_list, pipes = [], [], [], [], []
for fold, (tr, te) in enumerate(kf.split(X), 1):
X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(float)
X_te = [X[i] for i in te]; y_te = y[te].astype(float)
pipe.fit(X_tr, y_tr)
pred = pipe.predict(X_te)
pipes.append(deepcopy(pipe))
mae = mean_absolute_error(y_te, pred)
rss = float(np.sum((y_te - pred) ** 2))
tss = float(np.sum((y_te - np.mean(y_te)) ** 2))
rse = rss / tss if tss > 0 else np.nan
if len(np.unique(y_te)) > 1:
pcc = pearsonr(y_te, pred)[0]
kcc = kendalltau(y_te, pred)[0]
else:
pcc, kcc = np.nan, np.nan
mae_list.append(mae); rse_list.append(rse); pcc_list.append(pcc); kcc_list.append(kcc)
print(f"[Fold {fold}] MAE={mae:.4f} | RSE={rse if not np.isnan(rse) else float('nan'):.4f} | PCC={pcc if not np.isnan(pcc) else float('nan'):.4f} | KCC={kcc if not np.isnan(kcc) else float('nan'):.4f}")
metrics = {
"MAE": _mean_std(mae_list),
"RSE": _mean_std(rse_list),
"PCC": _mean_std([v for v in pcc_list if not np.isnan(v)]),
"KCC": _mean_std([v for v in kcc_list if not np.isnan(v)]),
}
print("\nValidation (KFold, mean ± std)")
print(f"- MAE: {metrics['MAE'][0]:.4f} ± {metrics['MAE'][1]:.4f}")
print(f"- RSE: {metrics['RSE'][0]:.4f} ± {metrics['RSE'][1]:.4f}")
if not np.isnan(metrics["PCC"][0]):
print(f"- PCC: {metrics['PCC'][0]:.4f} ± {metrics['PCC'][1]:.4f}")
else:
print("- PCC: undefined")
if not np.isnan(metrics["KCC"][0]):
print(f"- KCC: {metrics['KCC'][0]:.4f} ± {metrics['KCC'][1]:.4f}")
else:
print("- KCC: undefined")
return pipes, metrics
else:
raise ValueError("task must be 'cls' or 'reg'")
def save_things(models: List[Pipeline], metrics, path):
os.makedirs(path, exist_ok=True)
for i, pipe in enumerate(models):
dump(pipe, os.path.join(path, f"model_{i}.joblib"))
means, stds = [], []
for k, v in metrics.items():
means.append(f"{v[0]}")
stds.append(f"{v[1]}")
means = ','.join(means)
stds = ','.join(stds)
string = '\n'.join(('Valid', means, stds, 'Test', '0.,0.,0.,0.', '0.,0.,0.,0.'))
with open(os.path.join(path, 'result.txt'), 'w') as f:
f.write(string)
# ----------------- 用法示例 -----------------
if __name__ == "__main__":
# 分类 + 逻辑回归
models, metrics = run(get_data, task='cls', model='lr', n_splits=5, random_state=42)
save_things(models, metrics, 'run-cls/logistic_regression')
# 分类 + 随机森林
models, metrics = run(get_data, task='cls', model='rf', n_splits=5, random_state=42)
save_things(models, metrics, 'run-cls/random_forest')
# 回归 + Ridge
models, metrics = run(get_data, task='reg', model='lr', n_splits=5, random_state=42)
save_things(models, metrics, 'run-reg/ridge')
# 回归 + 随机森林
models, metrics = run(get_data, task='reg', model='rf', n_splits=5, random_state=42)
save_things(models, metrics, 'run-reg/random_forest') |