"""Search validation score fusions across completed experiments.""" from __future__ import annotations import argparse from pathlib import Path import numpy as np import pandas as pd from sklearn.linear_model import LogisticRegression from sklearn.metrics import precision_recall_curve, roc_auc_score from sklearn.model_selection import StratifiedKFold def best_f1(labels: np.ndarray, scores: np.ndarray): p, r, t = precision_recall_curve(labels, scores) f1 = 2 * p * r / (p + r + 1e-12) i = int(np.argmax(f1)) th = float(t[i]) if i < len(t) else 0.5 return float(f1[i]), th, float(roc_auc_score(labels, scores)) def rank01(x: np.ndarray) -> np.ndarray: order = np.argsort(x, kind="mergesort") out = np.empty(len(x), dtype=np.float32) out[order] = np.linspace(0.0, 1.0, len(x), dtype=np.float32) return out def zscore(x: np.ndarray) -> np.ndarray: return ((x - x.mean()) / (x.std() + 1e-8)).astype(np.float32) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--package-root", type=Path, default=Path(__file__).resolve().parents[1]) parser.add_argument("--random-iters", type=int, default=5000) parser.add_argument("--seed", type=int, default=0) args = parser.parse_args() root = args.package_root labels = pd.read_csv(root / "splits" / "notebook_seed0" / "val_pairs.csv")["label"].to_numpy() score_files = [] score_files.extend((root / "validation_runs" / "notebook_seed0").glob("*/scores/val_*.npy")) score_files.extend((root / "validation_runs" / "notebook_seed0" / "score_modes").glob("*.npy")) score_files = sorted(set(score_files)) names = [] cols = [] for path in score_files: if "ensemble" in path.name: continue x = np.load(path).astype(np.float32) if x.shape[0] != labels.shape[0] or np.std(x) < 1e-8: continue name = str(path.relative_to(root / "validation_runs" / "notebook_seed0")) names.append(name) cols.append(x) X = np.vstack(cols).T print(f"loaded {X.shape[1]} score columns") rows = [] for j, name in enumerate(names): f1, th, auc = best_f1(labels, X[:, j]) rows.append({"method": "single", "name": name, "n": 1, "f1": f1, "threshold": th, "auc": auc}) for transform_name, transform in [("raw_zmean", zscore), ("rank_mean", rank01)]: S = np.vstack([transform(X[:, j]) for j in range(X.shape[1])]).T scores = S.mean(axis=1) f1, th, auc = best_f1(labels, scores) rows.append({"method": transform_name, "name": "all", "n": X.shape[1], "f1": f1, "threshold": th, "auc": auc}) single_df = pd.DataFrame(rows).sort_values("f1", ascending=False) top_idx = [] for i in single_df[single_df["method"] == "single"].head(20).index: name = single_df.loc[i, "name"] top_idx.append(names.index(name)) top_idx = sorted(set(top_idx)) rng = np.random.default_rng(args.seed) best = None for space_name, transform in [("rank", rank01), ("z", zscore)]: S = np.vstack([transform(X[:, j]) for j in top_idx]).T for _ in range(args.random_iters): alpha = rng.uniform(0.3, 3.0, size=len(top_idx)) w = rng.dirichlet(alpha) scores = S @ w f1, th, auc = best_f1(labels, scores) if best is None or f1 > best["f1"]: best = { "method": f"random_{space_name}", "name": ",".join(names[i] for i in top_idx), "n": len(top_idx), "f1": f1, "threshold": th, "auc": auc, "weights": w, "idx": top_idx, "space": space_name, } rows.append({k: best[k] for k in ["method", "name", "n", "f1", "threshold", "auc"]}) # Out-of-fold logistic stacking gives a less overfit estimate than fitting on all rows. top10 = top_idx[:10] S = np.vstack([rank01(X[:, j]) for j in top10]).T oof = np.zeros(len(labels), dtype=np.float32) skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=args.seed) for tr, va in skf.split(S, labels): clf = LogisticRegression(C=0.2, max_iter=1000, solver="lbfgs") clf.fit(S[tr], labels[tr]) oof[va] = clf.predict_proba(S[va])[:, 1] f1, th, auc = best_f1(labels, oof) rows.append({"method": "logistic_oof_rank_top10", "name": ",".join(names[i] for i in top10), "n": len(top10), "f1": f1, "threshold": th, "auc": auc}) out_dir = root / "validation_runs" / "notebook_seed0" / "fusion_search" out_dir.mkdir(parents=True, exist_ok=True) result = pd.DataFrame(rows).sort_values("f1", ascending=False) result.to_csv(out_dir / "fusion_results.csv", index=False) if best is not None: np.save(out_dir / "best_random_weights.npy", best["weights"]) (out_dir / "best_random_members.txt").write_text("\n".join(names[i] for i in best["idx"]) + "\n") print(result.head(40).to_string(index=False)) if __name__ == "__main__": main()