File size: 14,326 Bytes
acbef3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
import os
from copy import deepcopy
from joblib import dump
from dataset import build_peptide_smiles, load_data, process_label
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit import DataStructs
from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import (
    average_precision_score, roc_auc_score, f1_score, accuracy_score, mean_absolute_error
)
from scipy.stats import pearsonr, kendalltau
from typing import List, Tuple, Union, Literal, Optional


def encode_sequence(seq):
    return Chem.MolFromSmiles(build_peptide_smiles(seq))


def get_data(mode='train', task="cls", include_reverse=False, include_self=False, one_way=False) :
    if mode == "train":
        loader = load_data
        xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', 'train.xlsx')
    elif mode in ["test", "r2_case", 'r2_case_']:
        one_way = True
        if mode in ["test", "r2_case", 'r2_case_']:
            loader = load_data
            xlsx_file = os.path.join(os.path.dirname(__file__), 'dataset', f'{mode}.xlsx')
    else:
        raise ValueError("未知的 mode,请使用 'train' 或 'test'")

    groups_avg = loader(xlsx_file, mode)

    data = []
    # 针对每个原型,过滤掉长度超过 pad_length 的变种
    for orig, variant_dict in groups_avg.items():
        # a = len(data)
        variants = list(variant_dict.keys())
        n_variants = len(variants)
        if n_variants == 0:
            continue

        # 若启用自组合,则添加 (A, A) 样本,标签为 process_label(1, task) → log2(1)=0(再分类也为 0)
        if include_self and (not one_way):
            for variant in variants:
                encoded_seq = encode_sequence(variant)
                label = process_label(1.0, task)  # log2(1)=0
                data.append(((encoded_seq, encoded_seq), label))
        
        # 添加不同变种之间的样本
        for i in [0] if one_way else range(n_variants):
            for j in range(i + 1, n_variants):
                var1 = variants[i]
                var2 = variants[j]
                mic1 = variant_dict[var1]
                mic2 = variant_dict[var2]
                
                # 正向组合: (var1, var2) 标签为 log₂(mic2/mic1)
                ratio = mic2 / mic1 if mic1 != 0 else np.nan
                label = process_label(ratio, task)
                if np.isnan(label):
                    continue
                encoded_var1 = encode_sequence(var1)
                encoded_var2 = encode_sequence(var2)
                data.append(((encoded_var1, encoded_var2), label))
                
                # 若启用正反组合,则添加 (var2, var1)
                if include_reverse and (not one_way):
                    rev_ratio = mic1 / mic2 if mic2 != 0 else np.nan
                    rev_label = process_label(rev_ratio, task)
                    data.append(((encoded_var2, encoded_var1), rev_label))
    
    return data


def _morgan_bitvect_from_mol(mol, radius=2, nBits=2048, useChirality=True):
    # bool array of size nBits
    gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality)
    bv = gen.GetFingerprint(mol)  # ExplicitBitVect
    arr = np.zeros((nBits,), dtype=np.uint8)
    # ConvertToNumpyArray works, but with generator we can use this:
    # bv is an ExplicitBitVect; use ToBitString if needed, but converting is fine:
    # RDKit still supports DataStructs.ConvertToNumpyArray; however to avoid old API you can do:
    onbits = list(bv.GetOnBits())
    arr[onbits] = 1
    return arr

def _morgan_countvect_hashed_from_mol(mol, radius=2, nBits=2048, useChirality=True):
    # dense float32 vector with hashed counts, consistent with previous behavior
    gen = GetMorganGenerator(radius=radius, fpSize=nBits, includeChirality=useChirality)
    sp = gen.GetCountFingerprint(mol)  # SparseIntVect
    vec = np.zeros((nBits,), dtype=np.float32)
    for k, v in sp.GetNonzeroElements().items():
        # The generator already hashed to [0, fpSize). Keys are within range.
        # So no extra Python hash: just place counts directly.
        idx = int(k)
        if 0 <= idx < nBits:
            vec[idx] += float(v)
    return vec

class PairwiseMolFeaturizer(BaseEstimator, TransformerMixin):
    def __init__(self, nBits=2048, radius=2, useChirality=True, mode="count"):
        """
        mode:
          - 'count': 使用 Δcount(回归推荐)
          - 'count+binary': 使用 [Δcount, XOR(bit)](分类推荐)
        """
        self.nBits = nBits
        self.radius = radius
        self.useChirality = useChirality
        self.mode = mode

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        feats = []
        for (mol1, mol2) in X:
            c1 = _morgan_countvect_hashed_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
            c2 = _morgan_countvect_hashed_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
            delta_count = c2 - c1
            if self.mode == "count":
                feats.append(delta_count)
            elif self.mode == "count+binary":
                b1 = _morgan_bitvect_from_mol(mol1, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
                b2 = _morgan_bitvect_from_mol(mol2, radius=self.radius, nBits=self.nBits, useChirality=self.useChirality)
                xor = np.logical_xor(b2, b1).astype(np.uint8).astype(np.float32)
                feats.append(np.concatenate([delta_count, xor], axis=0))
            else:
                raise ValueError("Unknown featurization mode")
        return np.vstack(feats)

# ----------------- 数据与工具 -----------------
def unpack_pairs(data_list: List[Tuple[Tuple[object, object], Union[int, float]]]):
    X = [pair for (pair, y) in data_list]
    y = np.array([y for (pair, y) in data_list])
    return X, y

def _mean_std(vals):
    arr = np.array(vals, dtype=float)
    arr = arr[~np.isnan(arr)]
    if arr.size == 0:
        return np.nan, np.nan
    return float(np.mean(arr)), float(np.std(arr))

# ----------------- 主训练-评估入口 -----------------
def run(
    get_data_func,
    task: Literal['cls', 'reg'],
    model: Literal['rf', 'lr'],
    n_splits: int = 5,
    random_state: int = 42,
    nBits: int = 2048,
    radius: int = 2,
    useChirality: bool = True
):
    """
    参数:
      - task: 'cls' 或 'reg'
      - model: 'rf' 或 'lr'
          * 分类: 'rf' -> RandomForestClassifier, 'lr' -> LogisticRegression
          * 回归: 'rf' -> RandomForestRegressor, 'lr' -> Ridge
    返回:
      - 最后一次拟合的 Pipeline 模型
      - 指标汇总 dict
    """
    data = get_data_func(task=task)
    X, y = unpack_pairs(data)
    print(f"Total pairs: {len(X)} | Task={task} | Model={model}")

    kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)

    if task == 'cls':
        # 特征
        fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count+binary")
        # 模型
        if model == 'rf':
            clf = RandomForestClassifier(
                n_estimators=600, max_features="sqrt", min_samples_leaf=1,
                class_weight="balanced_subsample", random_state=random_state, n_jobs=-1
            )
        elif model == 'lr':
            clf = LogisticRegression(solver="liblinear", max_iter=2000, class_weight="balanced")
        else:
            raise ValueError("model must be 'rf' or 'lr'")

        pipe = Pipeline([("fx", fx), ("clf", clf)])

        auprc_list, auroc_list, f1_list, acc_list, pipes = [], [], [], [], []

        for fold, (tr, te) in enumerate(kf.split(X), 1):
            X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(int)
            X_te = [X[i] for i in te]; y_te = y[te].astype(int)

            pipe.fit(X_tr, y_tr)
            # 概率输出
            if hasattr(pipe.named_steps['clf'], "predict_proba"):
                proba = pipe.predict_proba(X_te)[:, 1]
            else:
                # 如模型不支持 predict_proba,用 decision_function 做一个近似映射
                if hasattr(pipe.named_steps['clf'], "decision_function"):
                    scores = pipe.decision_function(X_te)
                    # min-max 归一为 [0,1] 近似概率
                    mn, mx = float(np.min(scores)), float(np.max(scores))
                    proba = (scores - mn) / (mx - mn + 1e-9)
                else:
                    # 退化为 hard label,不用于 AUC,但勉强计算 AUPRC 会失真
                    pred_hard = pipe.predict(X_te)
                    proba = pred_hard.astype(float)
            
            pipes.append(deepcopy(pipe))

            pred = (proba >= 0.5).astype(int)

            auprc = average_precision_score(y_te, proba)
            try:
                auroc = roc_auc_score(y_te, proba)
            except ValueError:
                auroc = np.nan
            f1 = f1_score(y_te, pred)
            acc = accuracy_score(y_te, pred)

            auprc_list.append(auprc); auroc_list.append(auroc); f1_list.append(f1); acc_list.append(acc)
            print(f"[Fold {fold}] AUPRC={auprc:.4f} | AUROC={auroc if not np.isnan(auroc) else float('nan'):.4f} | F1={f1:.4f} | ACC={acc:.4f}")

        metrics = {
            "AUPRC": _mean_std(auprc_list),
            "AUROC": _mean_std([v for v in auroc_list if not np.isnan(v)]),
            "F1": _mean_std(f1_list),
            "ACC": _mean_std(acc_list),
        }

        print("\nValidation (KFold, mean ± std)")
        print(f"- AUPRC: {metrics['AUPRC'][0]:.4f} ± {metrics['AUPRC'][1]:.4f}")
        if not np.isnan(metrics["AUROC"][0]):
            print(f"- AUROC: {metrics['AUROC'][0]:.4f} ± {metrics['AUROC'][1]:.4f}")
        else:
            print("- AUROC: undefined (some folds single-class)")
        print(f"- F1: {metrics['F1'][0]:.4f} ± {metrics['F1'][1]:.4f}")
        print(f"- ACC: {metrics['ACC'][0]:.4f} ± {metrics['ACC'][1]:.4f}")

        return pipes, metrics

    elif task == 'reg':
        fx = PairwiseMolFeaturizer(nBits=nBits, radius=radius, useChirality=useChirality, mode="count")

        if model == 'rf':
            reg = RandomForestRegressor(
                n_estimators=800, max_features="sqrt", min_samples_leaf=1,
                random_state=random_state, n_jobs=-1
            )
        elif model == 'lr':
            # 线性基线:Ridge
            reg = Ridge(alpha=1.0, random_state=random_state)
        else:
            raise ValueError("model must be 'rf' or 'lr'")

        pipe = Pipeline([("fx", fx), ("reg", reg)])

        mae_list, rse_list, pcc_list, kcc_list, pipes = [], [], [], [], []

        for fold, (tr, te) in enumerate(kf.split(X), 1):
            X_tr = [X[i] for i in tr]; y_tr = y[tr].astype(float)
            X_te = [X[i] for i in te]; y_te = y[te].astype(float)

            pipe.fit(X_tr, y_tr)
            pred = pipe.predict(X_te)

            pipes.append(deepcopy(pipe))

            mae = mean_absolute_error(y_te, pred)
            rss = float(np.sum((y_te - pred) ** 2))
            tss = float(np.sum((y_te - np.mean(y_te)) ** 2))
            rse = rss / tss if tss > 0 else np.nan

            if len(np.unique(y_te)) > 1:
                pcc = pearsonr(y_te, pred)[0]
                kcc = kendalltau(y_te, pred)[0]
            else:
                pcc, kcc = np.nan, np.nan

            mae_list.append(mae); rse_list.append(rse); pcc_list.append(pcc); kcc_list.append(kcc)
            print(f"[Fold {fold}] MAE={mae:.4f} | RSE={rse if not np.isnan(rse) else float('nan'):.4f} | PCC={pcc if not np.isnan(pcc) else float('nan'):.4f} | KCC={kcc if not np.isnan(kcc) else float('nan'):.4f}")

        metrics = {
            "MAE": _mean_std(mae_list),
            "RSE": _mean_std(rse_list),
            "PCC": _mean_std([v for v in pcc_list if not np.isnan(v)]),
            "KCC": _mean_std([v for v in kcc_list if not np.isnan(v)]),
        }

        print("\nValidation (KFold, mean ± std)")
        print(f"- MAE: {metrics['MAE'][0]:.4f} ± {metrics['MAE'][1]:.4f}")
        print(f"- RSE: {metrics['RSE'][0]:.4f} ± {metrics['RSE'][1]:.4f}")
        if not np.isnan(metrics["PCC"][0]):
            print(f"- PCC: {metrics['PCC'][0]:.4f} ± {metrics['PCC'][1]:.4f}")
        else:
            print("- PCC: undefined")
        if not np.isnan(metrics["KCC"][0]):
            print(f"- KCC: {metrics['KCC'][0]:.4f} ± {metrics['KCC'][1]:.4f}")
        else:
            print("- KCC: undefined")

        return pipes, metrics

    else:
        raise ValueError("task must be 'cls' or 'reg'")


def save_things(models: List[Pipeline], metrics, path):
    os.makedirs(path, exist_ok=True)
    for i, pipe in enumerate(models):
        dump(pipe, os.path.join(path, f"model_{i}.joblib")) 
    means, stds = [], []
    for k, v in metrics.items():
        means.append(f"{v[0]}")
        stds.append(f"{v[1]}")
    means = ','.join(means)
    stds = ','.join(stds)
    string = '\n'.join(('Valid', means, stds, 'Test', '0.,0.,0.,0.', '0.,0.,0.,0.'))
    with open(os.path.join(path, 'result.txt'), 'w') as f:
        f.write(string)

# ----------------- 用法示例 -----------------
if __name__ == "__main__":
    # 分类 + 逻辑回归
    models, metrics = run(get_data, task='cls', model='lr', n_splits=5, random_state=42)
    save_things(models, metrics, 'run-cls/logistic_regression')

    # 分类 + 随机森林
    models, metrics = run(get_data, task='cls', model='rf', n_splits=5, random_state=42)
    save_things(models, metrics, 'run-cls/random_forest')

    # 回归 + Ridge
    models, metrics = run(get_data, task='reg', model='lr', n_splits=5, random_state=42)
    save_things(models, metrics, 'run-reg/ridge')

    # 回归 + 随机森林
    models, metrics = run(get_data, task='reg', model='rf', n_splits=5, random_state=42)
    save_things(models, metrics, 'run-reg/random_forest')