| |
|
| | |
| | |
| | """ |
| | Analyze hyperparameter sweeps for L-RMC/GCN on Cora in a "rich" format similar to analyze_stats.py. |
| | |
| | Usage: |
| | python analyze_lrmc_sweep.py /path/to/hyperparam_sweep.csv --topk 10 --variant pool |
| | """ |
| | import argparse |
| | from pathlib import Path |
| | from collections import defaultdict |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | from rich.console import Console |
| | from rich.table import Table |
| | from rich.panel import Panel |
| | from rich.text import Text |
| | from rich import box |
| |
|
| | |
| | |
| | |
| | def _fmt(x, digits=3): |
| | if x is None or (isinstance(x, float) and (np.isnan(x) or np.isinf(x))): |
| | return "-" |
| | if isinstance(x, float): |
| | return f"{x:.{digits}f}" |
| | return str(x) |
| |
|
| | def _to_float(series): |
| | try: |
| | return series.astype(float) |
| | except Exception: |
| | return pd.to_numeric(series, errors="coerce") |
| |
|
| | def _unique_counts(df, cols): |
| | return {c: df[c].nunique() for c in cols if c in df.columns} |
| |
|
| | def _has_cols(df, cols): |
| | return all(c in df.columns for c in cols) |
| |
|
| | def _get_common_hparams(): |
| | |
| | return ["hidden", "lr", "dropout", "self_loop_scale", "use_a2"] |
| |
|
| | |
| | |
| | |
| | def analyze(df: pd.DataFrame): |
| | |
| | df = df.copy() |
| | |
| | expected = [ |
| | "dataset","variant","hidden","epochs","lr","wd","dropout", |
| | "self_loop_scale","use_a2", |
| | "lrmc_inv_weight","lrmc_gamma", |
| | "seed","val_accuracy","test_accuracy" |
| | ] |
| | missing = [c for c in expected if c not in df.columns] |
| | if missing: |
| | raise ValueError(f"CSV missing expected columns: {missing}") |
| |
|
| | |
| | num_cols = ["hidden","epochs","lr","wd","dropout","self_loop_scale", |
| | "lrmc_inv_weight","lrmc_gamma","val_accuracy","test_accuracy"] |
| | for c in num_cols: |
| | df[c] = _to_float(df[c]) |
| |
|
| | |
| | summary = { |
| | "file_rows": len(df), |
| | "dataset": ", ".join(sorted(df["dataset"].astype(str).unique())), |
| | "variants": ", ".join(sorted(df["variant"].astype(str).unique())), |
| | "columns": list(df.columns), |
| | "n_columns": len(df.columns), |
| | "unique_counts": _unique_counts(df, ["hidden","lr","dropout","self_loop_scale","use_a2","lrmc_inv_weight","lrmc_gamma"]), |
| | "val_test_corr_all": df["val_accuracy"].corr(df["test_accuracy"]), |
| | "val_test_corr_by_variant": df.groupby("variant").apply(lambda g: g["val_accuracy"].corr(g["test_accuracy"])).to_dict(), |
| | } |
| |
|
| | |
| | def best_rows(by_cols=("val_accuracy","test_accuracy"), ascending=(False, False), topk=10, variant=None): |
| | sub = df if variant is None else df[df["variant"]==variant] |
| | if len(sub)==0: |
| | return sub |
| | return sub.sort_values(list(by_cols), ascending=list(ascending)).head(topk) |
| |
|
| | best_overall = best_rows(topk=10) |
| | best_by_variant = {v: best_rows(topk=5, variant=v) for v in df["variant"].unique()} |
| |
|
| | |
| | hparams = ["hidden","lr","dropout","self_loop_scale","use_a2","lrmc_inv_weight","lrmc_gamma"] |
| | per_param = {} |
| | for hp in hparams: |
| | g = df.groupby(hp).agg( |
| | mean_val=("val_accuracy","mean"), |
| | std_val=("val_accuracy","std"), |
| | mean_test=("test_accuracy","mean"), |
| | std_test=("test_accuracy","std"), |
| | n=("val_accuracy","count"), |
| | ).sort_values("mean_val", ascending=False) |
| | per_param[hp] = g |
| |
|
| | per_param_by_variant = {} |
| | for v in df["variant"].unique(): |
| | sub = df[df["variant"]==v] |
| | per_param_by_variant[v] = {} |
| | for hp in hparams: |
| | g = sub.groupby(hp).agg( |
| | mean_val=("val_accuracy","mean"), |
| | std_val=("val_accuracy","std"), |
| | mean_test=("test_accuracy","mean"), |
| | std_test=("test_accuracy","std"), |
| | n=("val_accuracy","count"), |
| | ).sort_values("mean_val", ascending=False) |
| | per_param_by_variant[v][hp] = g |
| |
|
| | |
| | commons = _get_common_hparams() |
| | matched = df.groupby(["variant"]+commons).agg( |
| | mean_val=("val_accuracy","mean"), |
| | mean_test=("test_accuracy","mean"), |
| | best_val=("val_accuracy","max"), |
| | best_test=("test_accuracy","max"), |
| | n=("val_accuracy","count"), |
| | ).reset_index() |
| |
|
| | baseline_mean = matched[matched["variant"]=="baseline"].set_index(commons) |
| | pool_mean = matched[matched["variant"]=="pool"].set_index(commons) |
| |
|
| | |
| | comp_mean = pool_mean[["mean_val","mean_test"]].join( |
| | baseline_mean[["mean_val","mean_test"]], |
| | lsuffix="_pool", rsuffix="_base", how="inner" |
| | ) |
| | comp_mean["delta_val"] = comp_mean["mean_val_pool"] - comp_mean["mean_val_base"] |
| | comp_mean["delta_test"] = comp_mean["mean_test_pool"] - comp_mean["mean_test_base"] |
| |
|
| | |
| | baseline_best = matched[matched["variant"]=="baseline"].set_index(commons)[["best_val","best_test"]] |
| | pool_best = matched[matched["variant"]=="pool"].set_index(commons)[["best_val","best_test"]] |
| | comp_best = pool_best.join(baseline_best, lsuffix="_pool", rsuffix="_base", how="inner") |
| | comp_best["delta_best_val"] = comp_best["best_val_pool"] - comp_best["best_val_base"] |
| | comp_best["delta_best_test"] = comp_best["best_test_pool"] - comp_best["best_test_base"] |
| |
|
| | return { |
| | "df": df, |
| | "summary": summary, |
| | "best_overall": best_overall, |
| | "best_by_variant": best_by_variant, |
| | "per_param": per_param, |
| | "per_param_by_variant": per_param_by_variant, |
| | "comp_mean": comp_mean, |
| | "comp_best": comp_best, |
| | "commons": commons, |
| | } |
| |
|
| | |
| | |
| | |
| | def render_summary(console: Console, A): |
| | s = A["summary"] |
| | title = Text("L‑RMC/GCN Hyperparameter Sweep — Summary", style="bold white") |
| | body = Text() |
| | body.append(f"File rows: ", style="bold green"); body.append(str(s["file_rows"])+"\n") |
| | body.append(f"Dataset(s): ", style="bold green"); body.append(f"{s['dataset']}\n") |
| | body.append(f"Variants: ", style="bold green"); body.append(f"{s['variants']}\n") |
| | body.append(f"Val/Test Corr (all): ", style="bold green"); body.append(f"{_fmt(s['val_test_corr_all'])}\n") |
| | for v,c in s["val_test_corr_by_variant"].items(): |
| | body.append(f"Val/Test Corr ({v}): ", style="bold green"); body.append(f"{_fmt(c)}\n") |
| | |
| | uc = s["unique_counts"] |
| | uc_text = ", ".join([f"{k}={v}" for k,v in uc.items()]) |
| | body.append(f"Unique values per hparam: ", style="bold green"); body.append(uc_text+"\n") |
| | console.print(Panel(body, title=title, border_style="cyan", box=box.ROUNDED)) |
| |
|
| | def render_top_configs(console: Console, A, topk=10): |
| | |
| | df = A["best_overall"] |
| | table = Table(title=f"Top {min(topk, len(df))} Configs by Val Accuracy (overall)", box=box.SIMPLE_HEAVY) |
| | for col in ["variant","val_accuracy","test_accuracy","hidden","lr","dropout","self_loop_scale","use_a2","lrmc_inv_weight","lrmc_gamma"]: |
| | style = "yellow" if col=="val_accuracy" else ("green" if col=="test_accuracy" else "cyan") |
| | table.add_column(col, style=style, justify="right" if "accuracy" in col else "left") |
| | for _,row in df.iterrows(): |
| | table.add_row( |
| | str(row["variant"]), |
| | _fmt(row["val_accuracy"]), _fmt(row["test_accuracy"]), |
| | _fmt(row["hidden"]), _fmt(row["lr"]), _fmt(row["dropout"]), |
| | _fmt(row["self_loop_scale"]), str(row["use_a2"]), |
| | _fmt(row["lrmc_inv_weight"]), _fmt(row["lrmc_gamma"]) |
| | ) |
| | console.print(table) |
| |
|
| | |
| | for v, sub in A["best_by_variant"].items(): |
| | table = Table(title=f"Top {min(5, len(sub))} Configs for variant='{v}' (by Val)", box=box.MINIMAL_HEAVY_HEAD) |
| | for col in ["val_accuracy","test_accuracy","hidden","lr","dropout","self_loop_scale","use_a2","lrmc_inv_weight","lrmc_gamma"]: |
| | style = "yellow" if col=="val_accuracy" else ("green" if col=="test_accuracy" else "cyan") |
| | table.add_column(col, style=style, justify="right" if "accuracy" in col else "right") |
| | for _,row in sub.iterrows(): |
| | table.add_row( |
| | _fmt(row["val_accuracy"]), _fmt(row["test_accuracy"]), |
| | _fmt(row["hidden"]), _fmt(row["lr"]), _fmt(row["dropout"]), |
| | _fmt(row["self_loop_scale"]), str(row["use_a2"]), |
| | _fmt(row["lrmc_inv_weight"]), _fmt(row["lrmc_gamma"]) |
| | ) |
| | console.print(table) |
| |
|
| | def render_per_param(console: Console, A, variant=None): |
| | title = f"Per‑Hyperparameter Effects (marginal means){'' if variant is None else f' — variant={variant}'}" |
| | console.print(Panel(Text(title, style="bold white"), border_style="magenta", box=box.ROUNDED)) |
| | per_param = A["per_param"] if variant is None else A["per_param_by_variant"][variant] |
| |
|
| | for hp, g in per_param.items(): |
| | table = Table(title=f"{hp}", box=box.SIMPLE_HEAD) |
| | table.add_column(hp, style="cyan") |
| | table.add_column("n", style="white", justify="right") |
| | table.add_column("val_mean", style="yellow", justify="right") |
| | table.add_column("val_std", style="yellow", justify="right") |
| | table.add_column("test_mean", style="green", justify="right") |
| | table.add_column("test_std", style="green", justify="right") |
| |
|
| | |
| | if len(g)>0: |
| | best_val_idx = g["mean_val"].idxmax() |
| | best_test_idx = g["mean_test"].idxmax() |
| | else: |
| | best_val_idx = best_test_idx = None |
| |
|
| | for idx, row in g.reset_index().iterrows(): |
| | key = row[hp] |
| | is_best_val = (key == best_val_idx) |
| | is_best_test = (key == best_test_idx) |
| |
|
| | def mark(s, best): |
| | return f"[bold green]{s}[/]" if best else s |
| |
|
| | table.add_row( |
| | f"{key}", |
| | _fmt(row["n"]), |
| | mark(_fmt(row["mean_val"]), is_best_val), |
| | _fmt(row["std_val"]), |
| | mark(_fmt(row["mean_test"]), is_best_test), |
| | _fmt(row["std_test"]), |
| | ) |
| | console.print(table) |
| |
|
| | def render_matched_comparisons(console: Console, A): |
| | console.print(Panel(Text("Baseline vs Pool — Matched Hyperparameter Comparisons", style="bold white"), border_style="blue", box=box.ROUNDED)) |
| | comp_mean = A["comp_mean"].reset_index() |
| | comp_best = A["comp_best"].reset_index() |
| | commons = A["commons"] |
| |
|
| | |
| | win_rate_mean = float((comp_mean["delta_test"] > 0).mean()) if len(comp_mean)>0 else float("nan") |
| | win_rate_best = float((comp_best["delta_best_test"] > 0).mean()) if len(comp_best)>0 else float("nan") |
| |
|
| | stats_text = Text() |
| | stats_text.append("Pool > Baseline (by mean test across same settings): ", style="bold green") |
| | stats_text.append(f"{_fmt(100*win_rate_mean, 1)}% of settings\n") |
| | stats_text.append("Pool best > Baseline best (per setting): ", style="bold green") |
| | stats_text.append(f"{_fmt(100*win_rate_best, 1)}% of settings\n") |
| | console.print(Panel(stats_text, border_style="green", box=box.SQUARE)) |
| |
|
| | |
| | def table_from_df(df, deltas_col, title): |
| | df = df.sort_values(deltas_col, ascending=False) |
| | head = df.head(8) |
| | tail = df.tail(8) |
| | for part, name in [(head, "Top Gains (Pool minus Baseline)"), (tail, "Largest Drops (Pool minus Baseline)")]: |
| |
|
| | table = Table(title=f"{title} — {name}", box=box.MINIMAL_HEAVY_HEAD) |
| | for c in commons + ["mean_test_pool","mean_test_base","delta_test"]: |
| | style = "green" if c in ("mean_test_pool","mean_test_base","delta_test") else "cyan" |
| | table.add_column(c, style=style, justify="right") |
| | for _,r in part.iterrows(): |
| | row = [str(r[c]) for c in commons] + [_fmt(r["mean_test_pool"]), _fmt(r["mean_test_base"]), _fmt(r["delta_test"])] |
| | table.add_row(*row) |
| | console.print(table) |
| |
|
| | if len(comp_mean)>0: |
| | table_from_df(comp_mean, "delta_test", "Mean Test Accuracy (matched)") |
| | if len(comp_best)>0: |
| | |
| | df = comp_best.sort_values("delta_best_test", ascending=False) |
| | head = df.head(8) |
| | tail = df.tail(8) |
| | for part, name in [(head, "Top Gains"), (tail, "Largest Drops")]: |
| | table = Table(title=f"Best-vs-Best Test Accuracy — {name}", box=box.SIMPLE_HEAVY) |
| | for c in commons + ["best_test_pool","best_test_base","delta_best_test"]: |
| | style = "green" if c in ("best_test_pool","best_test_base","delta_best_test") else "cyan" |
| | table.add_column(c, style=style, justify="right") |
| | for _,r in part.iterrows(): |
| | row = [str(r[c]) for c in commons] + [_fmt(r["best_test_pool"]), _fmt(r["best_test_base"]), _fmt(r["delta_best_test"])] |
| | table.add_row(*row) |
| | console.print(table) |
| |
|
| | def recommend_settings(console: Console, A): |
| | """Recommend a configuration per variant based on marginal means and sanity checks.""" |
| | per_pool = A["per_param_by_variant"].get("pool", {}) |
| | per_base = A["per_param_by_variant"].get("baseline", {}) |
| |
|
| | |
| | rec_pool = { |
| | "hidden": per_pool.get("hidden", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "hidden" in per_pool else None, |
| | "lr": per_pool.get("lr", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "lr" in per_pool else None, |
| | "dropout": per_pool.get("dropout", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "dropout" in per_pool else None, |
| | "self_loop_scale": per_pool.get("self_loop_scale", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "self_loop_scale" in per_pool else None, |
| | "use_a2": per_pool.get("use_a2", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "use_a2" in per_pool else None, |
| | "lrmc_inv_weight": per_pool.get("lrmc_inv_weight", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "lrmc_inv_weight" in per_pool else None, |
| | "lrmc_gamma": per_pool.get("lrmc_gamma", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "lrmc_gamma" in per_pool else None, |
| | } |
| |
|
| | |
| | rec_base = { |
| | "hidden": per_base.get("hidden", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "hidden" in per_base else None, |
| | "lr": per_base.get("lr", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "lr" in per_base else None, |
| | "dropout": per_base.get("dropout", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "dropout" in per_base else None, |
| | "self_loop_scale": per_base.get("self_loop_scale", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "self_loop_scale" in per_base else None, |
| | "use_a2": per_base.get("use_a2", pd.DataFrame()).get("mean_test", pd.Series()).idxmax() if "use_a2" in per_base else None, |
| | } |
| |
|
| | |
| | def render_panel(title, rec): |
| | txt = Text() |
| | for k,v in rec.items(): |
| | txt.append(f"{k}: ", style="bold cyan"); txt.append(f"{v}\n") |
| | console.print(Panel(txt, title=title, border_style="green", box=box.ROUNDED)) |
| |
|
| | render_panel("Recommended settings — variant=pool (by mean test)", rec_pool) |
| | render_panel("Recommended settings — variant=baseline (by mean test)", rec_base) |
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("csv_path", type=str, help="Path to hyperparam_sweep.csv") |
| | parser.add_argument("--topk", type=int, default=10, help="How many top configs to display") |
| | parser.add_argument("--variant", type=str, default=None, help="Filter to a specific variant (baseline or pool) for per-parameter tables") |
| | args = parser.parse_args() |
| |
|
| | csv_path = Path(args.csv_path) |
| | if not csv_path.exists(): |
| | raise SystemExit(f"CSV not found: {csv_path}") |
| |
|
| | df = pd.read_csv(csv_path) |
| | A = analyze(df) |
| |
|
| | console = Console() |
| | render_summary(console, A) |
| | render_top_configs(console, A, topk=args.topk) |
| |
|
| | |
| | render_per_param(console, A, variant=None) |
| | |
| | if args.variant is not None: |
| | render_per_param(console, A, variant=args.variant) |
| |
|
| | render_matched_comparisons(console, A) |
| | recommend_settings(console, A) |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|