| """Search validation score fusions across completed experiments.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import precision_recall_curve, roc_auc_score |
| from sklearn.model_selection import StratifiedKFold |
|
|
|
|
| def best_f1(labels: np.ndarray, scores: np.ndarray): |
| p, r, t = precision_recall_curve(labels, scores) |
| f1 = 2 * p * r / (p + r + 1e-12) |
| i = int(np.argmax(f1)) |
| th = float(t[i]) if i < len(t) else 0.5 |
| return float(f1[i]), th, float(roc_auc_score(labels, scores)) |
|
|
|
|
| def rank01(x: np.ndarray) -> np.ndarray: |
| order = np.argsort(x, kind="mergesort") |
| out = np.empty(len(x), dtype=np.float32) |
| out[order] = np.linspace(0.0, 1.0, len(x), dtype=np.float32) |
| return out |
|
|
|
|
| def zscore(x: np.ndarray) -> np.ndarray: |
| return ((x - x.mean()) / (x.std() + 1e-8)).astype(np.float32) |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--package-root", type=Path, default=Path(__file__).resolve().parents[1]) |
| parser.add_argument("--random-iters", type=int, default=5000) |
| parser.add_argument("--seed", type=int, default=0) |
| args = parser.parse_args() |
|
|
| root = args.package_root |
| labels = pd.read_csv(root / "splits" / "notebook_seed0" / "val_pairs.csv")["label"].to_numpy() |
| score_files = [] |
| score_files.extend((root / "validation_runs" / "notebook_seed0").glob("*/scores/val_*.npy")) |
| score_files.extend((root / "validation_runs" / "notebook_seed0" / "score_modes").glob("*.npy")) |
| score_files = sorted(set(score_files)) |
|
|
| names = [] |
| cols = [] |
| for path in score_files: |
| if "ensemble" in path.name: |
| continue |
| x = np.load(path).astype(np.float32) |
| if x.shape[0] != labels.shape[0] or np.std(x) < 1e-8: |
| continue |
| name = str(path.relative_to(root / "validation_runs" / "notebook_seed0")) |
| names.append(name) |
| cols.append(x) |
|
|
| X = np.vstack(cols).T |
| print(f"loaded {X.shape[1]} score columns") |
|
|
| rows = [] |
| for j, name in enumerate(names): |
| f1, th, auc = best_f1(labels, X[:, j]) |
| rows.append({"method": "single", "name": name, "n": 1, "f1": f1, "threshold": th, "auc": auc}) |
|
|
| for transform_name, transform in [("raw_zmean", zscore), ("rank_mean", rank01)]: |
| S = np.vstack([transform(X[:, j]) for j in range(X.shape[1])]).T |
| scores = S.mean(axis=1) |
| f1, th, auc = best_f1(labels, scores) |
| rows.append({"method": transform_name, "name": "all", "n": X.shape[1], "f1": f1, "threshold": th, "auc": auc}) |
|
|
| single_df = pd.DataFrame(rows).sort_values("f1", ascending=False) |
| top_idx = [] |
| for i in single_df[single_df["method"] == "single"].head(20).index: |
| name = single_df.loc[i, "name"] |
| top_idx.append(names.index(name)) |
| top_idx = sorted(set(top_idx)) |
|
|
| rng = np.random.default_rng(args.seed) |
| best = None |
| for space_name, transform in [("rank", rank01), ("z", zscore)]: |
| S = np.vstack([transform(X[:, j]) for j in top_idx]).T |
| for _ in range(args.random_iters): |
| alpha = rng.uniform(0.3, 3.0, size=len(top_idx)) |
| w = rng.dirichlet(alpha) |
| scores = S @ w |
| f1, th, auc = best_f1(labels, scores) |
| if best is None or f1 > best["f1"]: |
| best = { |
| "method": f"random_{space_name}", |
| "name": ",".join(names[i] for i in top_idx), |
| "n": len(top_idx), |
| "f1": f1, |
| "threshold": th, |
| "auc": auc, |
| "weights": w, |
| "idx": top_idx, |
| "space": space_name, |
| } |
| rows.append({k: best[k] for k in ["method", "name", "n", "f1", "threshold", "auc"]}) |
|
|
| |
| top10 = top_idx[:10] |
| S = np.vstack([rank01(X[:, j]) for j in top10]).T |
| oof = np.zeros(len(labels), dtype=np.float32) |
| skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=args.seed) |
| for tr, va in skf.split(S, labels): |
| clf = LogisticRegression(C=0.2, max_iter=1000, solver="lbfgs") |
| clf.fit(S[tr], labels[tr]) |
| oof[va] = clf.predict_proba(S[va])[:, 1] |
| f1, th, auc = best_f1(labels, oof) |
| rows.append({"method": "logistic_oof_rank_top10", "name": ",".join(names[i] for i in top10), "n": len(top10), "f1": f1, "threshold": th, "auc": auc}) |
|
|
| out_dir = root / "validation_runs" / "notebook_seed0" / "fusion_search" |
| out_dir.mkdir(parents=True, exist_ok=True) |
| result = pd.DataFrame(rows).sort_values("f1", ascending=False) |
| result.to_csv(out_dir / "fusion_results.csv", index=False) |
| if best is not None: |
| np.save(out_dir / "best_random_weights.npy", best["weights"]) |
| (out_dir / "best_random_members.txt").write_text("\n".join(names[i] for i in best["idx"]) + "\n") |
| print(result.head(40).to_string(index=False)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|