| """Search score-level fusions for one dynamic notebook-style split.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import importlib.util |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import precision_recall_curve, roc_auc_score |
| from sklearn.model_selection import StratifiedKFold |
|
|
|
|
| def load_train_module(path: Path): |
| spec = importlib.util.spec_from_file_location("train_val_lgcn_ensemble", path) |
| module = importlib.util.module_from_spec(spec) |
| assert spec.loader is not None |
| spec.loader.exec_module(module) |
| return module |
|
|
|
|
| def best_f1(y: np.ndarray, s: np.ndarray): |
| p, r, t = precision_recall_curve(y, s) |
| f = 2 * p * r / (p + r + 1e-12) |
| i = int(np.argmax(f)) |
| return float(f[i]), float(t[i] if i < len(t) else 0.5), float(roc_auc_score(y, s)) |
|
|
|
|
| def rank01(x: np.ndarray) -> np.ndarray: |
| order = np.argsort(x, kind="mergesort") |
| out = np.empty(len(x), dtype=np.float32) |
| out[order] = np.linspace(0, 1, len(x), dtype=np.float32) |
| return out |
|
|
|
|
| def zscore(x: np.ndarray) -> np.ndarray: |
| return ((x - x.mean()) / (x.std() + 1e-8)).astype(np.float32) |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--package-root", type=Path, default=Path(__file__).resolve().parents[1]) |
| parser.add_argument("--split-seed", type=int, required=True) |
| parser.add_argument("--train-frac", type=float, default=0.9) |
| parser.add_argument("--top-k", type=int, default=24) |
| parser.add_argument("--random-iters", type=int, default=15000) |
| parser.add_argument("--seed", type=int, default=0) |
| args = parser.parse_args() |
|
|
| root = args.package_root |
| tv = load_train_module(root / "code" / "train_val_lgcn_ensemble.py") |
| _, val_pairs = tv.make_notebook_style_split(root, args.split_seed, args.train_frac) |
| labels = val_pairs["label"].to_numpy(np.int8) |
| split_dir = root / "validation_runs" / f"dynamic_seed{args.split_seed}" |
|
|
| score_files = [] |
| score_files.extend(split_dir.glob("dyn*/scores/val_*.npy")) |
| score_files.extend(split_dir.glob("feature_fusion/val_*.npy")) |
| score_files.extend(split_dir.glob("score_modes/*.npy")) |
| score_files = sorted(set(score_files)) |
|
|
| names, cols = [], [] |
| for path in score_files: |
| x = np.load(path).astype(np.float32) |
| if len(x) != len(labels) or np.std(x) < 1e-8: |
| continue |
| names.append(str(path.relative_to(split_dir))) |
| cols.append(x) |
| if not cols: |
| raise SystemExit(f"no compatible scores under {split_dir}") |
|
|
| X = np.vstack(cols).T |
| rows = [] |
| for j, name in enumerate(names): |
| f1, th, auc = best_f1(labels, X[:, j]) |
| rows.append({"method": "single", "name": name, "n": 1, "f1": f1, "threshold": th, "auc": auc}) |
|
|
| single = pd.DataFrame(rows).sort_values("f1", ascending=False) |
| top_idx = [names.index(n) for n in single["name"].head(min(args.top_k, len(names)))] |
|
|
| for method, transform in [("all_rank_mean", rank01), ("all_z_mean", zscore)]: |
| S = np.vstack([transform(X[:, j]) for j in range(X.shape[1])]).T |
| f1, th, auc = best_f1(labels, S.mean(axis=1)) |
| rows.append({"method": method, "name": "all", "n": X.shape[1], "f1": f1, "threshold": th, "auc": auc}) |
|
|
| rng = np.random.default_rng(args.seed) |
| for space_name, transform in [("rank", rank01), ("z", zscore)]: |
| S = np.vstack([transform(X[:, j]) for j in top_idx]).T |
| best = None |
| for _ in range(args.random_iters): |
| w = rng.dirichlet(rng.uniform(0.4, 4.0, size=len(top_idx))) |
| scores = S @ w |
| f1, th, auc = best_f1(labels, scores) |
| if best is None or f1 > best["f1"]: |
| best = {"f1": f1, "threshold": th, "auc": auc, "weights": w} |
| assert best is not None |
| rows.append( |
| { |
| "method": f"random_{space_name}_top{len(top_idx)}", |
| "name": ",".join(names[i] for i in top_idx), |
| "n": len(top_idx), |
| "f1": best["f1"], |
| "threshold": best["threshold"], |
| "auc": best["auc"], |
| } |
| ) |
| np.save(split_dir / f"fusion_weights_{space_name}.npy", best["weights"]) |
|
|
| for k in [8, 12, min(20, len(top_idx))]: |
| sub_idx = top_idx[:k] |
| S = np.vstack([rank01(X[:, j]) for j in sub_idx]).T |
| oof = np.zeros(len(labels), dtype=np.float32) |
| skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=args.seed) |
| for tr, va in skf.split(S, labels): |
| clf = LogisticRegression(C=0.2, max_iter=1000, solver="lbfgs") |
| clf.fit(S[tr], labels[tr]) |
| oof[va] = clf.predict_proba(S[va])[:, 1] |
| f1, th, auc = best_f1(labels, oof) |
| rows.append( |
| { |
| "method": f"logistic_oof_rank_top{k}", |
| "name": ",".join(names[i] for i in sub_idx), |
| "n": k, |
| "f1": f1, |
| "threshold": th, |
| "auc": auc, |
| } |
| ) |
|
|
| result = pd.DataFrame(rows).sort_values("f1", ascending=False) |
| out = split_dir / "dynamic_fusion_results.csv" |
| result.to_csv(out, index=False) |
| print(f"loaded {X.shape[1]} score columns") |
| print(result.head(50).to_string(index=False)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|