cs3319-project2 / code /search_dynamic_fusion.py
NLP-beginner's picture
CS3319 Project 2 final deliverable (public F1 = 0.96626)
f28d994
Raw
History Blame Contribute Delete
5.38 kB
"""Search score-level fusions for one dynamic notebook-style split."""
from __future__ import annotations
import argparse
import importlib.util
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_curve, roc_auc_score
from sklearn.model_selection import StratifiedKFold
def load_train_module(path: Path):
spec = importlib.util.spec_from_file_location("train_val_lgcn_ensemble", path)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def best_f1(y: np.ndarray, s: np.ndarray):
p, r, t = precision_recall_curve(y, s)
f = 2 * p * r / (p + r + 1e-12)
i = int(np.argmax(f))
return float(f[i]), float(t[i] if i < len(t) else 0.5), float(roc_auc_score(y, s))
def rank01(x: np.ndarray) -> np.ndarray:
order = np.argsort(x, kind="mergesort")
out = np.empty(len(x), dtype=np.float32)
out[order] = np.linspace(0, 1, len(x), dtype=np.float32)
return out
def zscore(x: np.ndarray) -> np.ndarray:
return ((x - x.mean()) / (x.std() + 1e-8)).astype(np.float32)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--package-root", type=Path, default=Path(__file__).resolve().parents[1])
parser.add_argument("--split-seed", type=int, required=True)
parser.add_argument("--train-frac", type=float, default=0.9)
parser.add_argument("--top-k", type=int, default=24)
parser.add_argument("--random-iters", type=int, default=15000)
parser.add_argument("--seed", type=int, default=0)
args = parser.parse_args()
root = args.package_root
tv = load_train_module(root / "code" / "train_val_lgcn_ensemble.py")
_, val_pairs = tv.make_notebook_style_split(root, args.split_seed, args.train_frac)
labels = val_pairs["label"].to_numpy(np.int8)
split_dir = root / "validation_runs" / f"dynamic_seed{args.split_seed}"
score_files = []
score_files.extend(split_dir.glob("dyn*/scores/val_*.npy"))
score_files.extend(split_dir.glob("feature_fusion/val_*.npy"))
score_files.extend(split_dir.glob("score_modes/*.npy"))
score_files = sorted(set(score_files))
names, cols = [], []
for path in score_files:
x = np.load(path).astype(np.float32)
if len(x) != len(labels) or np.std(x) < 1e-8:
continue
names.append(str(path.relative_to(split_dir)))
cols.append(x)
if not cols:
raise SystemExit(f"no compatible scores under {split_dir}")
X = np.vstack(cols).T
rows = []
for j, name in enumerate(names):
f1, th, auc = best_f1(labels, X[:, j])
rows.append({"method": "single", "name": name, "n": 1, "f1": f1, "threshold": th, "auc": auc})
single = pd.DataFrame(rows).sort_values("f1", ascending=False)
top_idx = [names.index(n) for n in single["name"].head(min(args.top_k, len(names)))]
for method, transform in [("all_rank_mean", rank01), ("all_z_mean", zscore)]:
S = np.vstack([transform(X[:, j]) for j in range(X.shape[1])]).T
f1, th, auc = best_f1(labels, S.mean(axis=1))
rows.append({"method": method, "name": "all", "n": X.shape[1], "f1": f1, "threshold": th, "auc": auc})
rng = np.random.default_rng(args.seed)
for space_name, transform in [("rank", rank01), ("z", zscore)]:
S = np.vstack([transform(X[:, j]) for j in top_idx]).T
best = None
for _ in range(args.random_iters):
w = rng.dirichlet(rng.uniform(0.4, 4.0, size=len(top_idx)))
scores = S @ w
f1, th, auc = best_f1(labels, scores)
if best is None or f1 > best["f1"]:
best = {"f1": f1, "threshold": th, "auc": auc, "weights": w}
assert best is not None
rows.append(
{
"method": f"random_{space_name}_top{len(top_idx)}",
"name": ",".join(names[i] for i in top_idx),
"n": len(top_idx),
"f1": best["f1"],
"threshold": best["threshold"],
"auc": best["auc"],
}
)
np.save(split_dir / f"fusion_weights_{space_name}.npy", best["weights"])
for k in [8, 12, min(20, len(top_idx))]:
sub_idx = top_idx[:k]
S = np.vstack([rank01(X[:, j]) for j in sub_idx]).T
oof = np.zeros(len(labels), dtype=np.float32)
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=args.seed)
for tr, va in skf.split(S, labels):
clf = LogisticRegression(C=0.2, max_iter=1000, solver="lbfgs")
clf.fit(S[tr], labels[tr])
oof[va] = clf.predict_proba(S[va])[:, 1]
f1, th, auc = best_f1(labels, oof)
rows.append(
{
"method": f"logistic_oof_rank_top{k}",
"name": ",".join(names[i] for i in sub_idx),
"n": k,
"f1": f1,
"threshold": th,
"auc": auc,
}
)
result = pd.DataFrame(rows).sort_values("f1", ascending=False)
out = split_dir / "dynamic_fusion_results.csv"
result.to_csv(out, index=False)
print(f"loaded {X.shape[1]} score columns")
print(result.head(50).to_string(index=False))
if __name__ == "__main__":
main()