Spaces:
Running
Running
| """ | |
| results_loader.py | |
| ----------------- | |
| Scans experiment output directories and assembles results into pandas | |
| DataFrames ready for display in the Gradio leaderboard. | |
| Output directory conventions (from experiments.yaml): | |
| Task 1: <output_base>/<model>/<fmt>_input_<strat>/ | |
| → results_{grid_size}x{grid_size}_k{k}.csv OR summary.json | |
| Task 2: <output_base>/<model>/point_reuse_q3q0_<strat>/ | |
| → proximity_comparison_results.csv | |
| Task 3: <output_base>/<model>/orthogonal_corners_to_center_<strat>/ | |
| → results.csv OR summary_stats.json | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from pathlib import Path | |
| import pandas as pd | |
| import yaml | |
| # --------------------------------------------------------------------------- | |
| # Config helpers | |
| # --------------------------------------------------------------------------- | |
| def load_config(config_path: str | Path) -> dict: | |
| with open(config_path) as f: | |
| return yaml.safe_load(f) | |
| def _model_display(cfg: dict, model_id: str) -> str: | |
| return cfg["models"].get(model_id, {}).get("display_name", model_id) | |
| # --------------------------------------------------------------------------- | |
| # Task 1 results | |
| # --------------------------------------------------------------------------- | |
| def load_maze_navigation_results(cfg: dict, repo_root: Path) -> pd.DataFrame: | |
| """ | |
| Scan Task 1 output dirs and return a DataFrame with columns: | |
| model, display_name, input_format, prompt_strategy, grid_size, k_shot, accuracy | |
| """ | |
| task = cfg["maze_navigation"] | |
| base = repo_root / task["output_base"] | |
| rows = [] | |
| for model_id, model_meta in cfg["models"].items(): | |
| display = model_meta["display_name"] | |
| for fmt in task["input_formats"]: | |
| for strat in task["prompt_strategies"]: | |
| subdir = base / model_id.replace(".", "_").replace("-", "_") / f"{fmt}_input_{strat}" | |
| if not subdir.exists(): | |
| continue | |
| # Look for summary JSON first, then CSVs | |
| summary_file = subdir / "summary.json" | |
| if summary_file.exists(): | |
| _parse_task1_summary(summary_file, rows, model_id, display, fmt, strat) | |
| else: | |
| # Fall back to per-grid CSVs | |
| for csv_file in sorted(subdir.glob("*.csv")): | |
| _parse_task1_csv(csv_file, rows, model_id, display, fmt, strat) | |
| if not rows: | |
| return pd.DataFrame(columns=[ | |
| "model", "display_name", "input_format", "prompt_strategy", | |
| "grid_size", "k_shot", "accuracy" | |
| ]) | |
| return pd.DataFrame(rows) | |
| def _parse_task1_summary(path: Path, rows: list, model_id, display, fmt, strat): | |
| try: | |
| with open(path) as f: | |
| data = json.load(f) | |
| # Expected: {grid_size: {k: accuracy, ...}, ...} | |
| for grid_key, k_dict in data.items(): | |
| try: | |
| grid_size = int(str(grid_key).replace("x", "").split("_")[0]) | |
| except ValueError: | |
| continue | |
| if isinstance(k_dict, dict): | |
| for k, acc in k_dict.items(): | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "input_format": fmt, "prompt_strategy": strat, | |
| "grid_size": grid_size, "k_shot": int(k), | |
| "accuracy": float(acc), | |
| }) | |
| elif isinstance(k_dict, (int, float)): | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "input_format": fmt, "prompt_strategy": strat, | |
| "grid_size": grid_size, "k_shot": 0, | |
| "accuracy": float(k_dict), | |
| }) | |
| except Exception: | |
| pass | |
| def _parse_task1_csv(path: Path, rows: list, model_id, display, fmt, strat): | |
| try: | |
| df = pd.read_csv(path) | |
| # Detect grid_size and k_shot from filename or columns | |
| grid_size = None | |
| k_shot = 0 | |
| name = path.stem | |
| for part in name.split("_"): | |
| if part.startswith("k") and part[1:].isdigit(): | |
| k_shot = int(part[1:]) | |
| if "x" in part: | |
| try: | |
| g = int(part.split("x")[0]) | |
| grid_size = g | |
| except ValueError: | |
| pass | |
| if "grid_size" in df.columns: | |
| for gs, gdf in df.groupby("grid_size"): | |
| acc = _df_accuracy(gdf) | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "input_format": fmt, "prompt_strategy": strat, | |
| "grid_size": int(gs), "k_shot": k_shot, | |
| "accuracy": acc, | |
| }) | |
| elif grid_size is not None: | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "input_format": fmt, "prompt_strategy": strat, | |
| "grid_size": grid_size, "k_shot": k_shot, | |
| "accuracy": _df_accuracy(df), | |
| }) | |
| except Exception: | |
| pass | |
| def _df_accuracy(df: pd.DataFrame) -> float: | |
| for col in ("is_correct", "exact_match", "correct", "accuracy"): | |
| if col in df.columns: | |
| return float(df[col].mean()) | |
| return float("nan") | |
| # --------------------------------------------------------------------------- | |
| # Task 2 results | |
| # --------------------------------------------------------------------------- | |
| def load_point_reuse_results(cfg: dict, repo_root: Path) -> pd.DataFrame: | |
| """ | |
| Return DataFrame with columns: | |
| model, display_name, prompt_strategy, grid_size, question_idx, accuracy | |
| """ | |
| task = cfg["point_reuse"] | |
| base = repo_root / task["output_base"] | |
| rows = [] | |
| for model_id, model_meta in cfg["models"].items(): | |
| display = model_meta["display_name"] | |
| for strat, strat_cfg in task["prompt_strategies"].items(): | |
| subdir = ( | |
| base | |
| / model_id.replace(".", "_").replace("-", "_") | |
| / f"point_reuse_q3q0_{strat}" | |
| ) | |
| if not subdir.exists(): | |
| # Also try the pattern used by existing scripts | |
| subdir = base / model_id / f"proximity_comparison_point_reuse_last_first_same_{strat_cfg['prompt_type']}" | |
| if not subdir.exists(): | |
| continue | |
| csv_files = list(subdir.glob("*.csv")) | |
| for csv_file in csv_files: | |
| try: | |
| df = pd.read_csv(csv_file) | |
| if "grid_size" not in df.columns: | |
| continue | |
| q_col = next( | |
| (c for c in ("question_idx", "question_index", "q_idx") if c in df.columns), | |
| None, | |
| ) | |
| for gs, gdf in df.groupby("grid_size"): | |
| if q_col: | |
| for qi, qdf in gdf.groupby(q_col): | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "prompt_strategy": strat, | |
| "grid_size": int(gs), | |
| "question_idx": int(qi), | |
| "accuracy": _df_accuracy(qdf), | |
| }) | |
| else: | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "prompt_strategy": strat, | |
| "grid_size": int(gs), | |
| "question_idx": -1, | |
| "accuracy": _df_accuracy(gdf), | |
| }) | |
| except Exception: | |
| pass | |
| if not rows: | |
| return pd.DataFrame(columns=[ | |
| "model", "display_name", "prompt_strategy", | |
| "grid_size", "question_idx", "accuracy" | |
| ]) | |
| return pd.DataFrame(rows) | |
| # --------------------------------------------------------------------------- | |
| # Task 3 results | |
| # --------------------------------------------------------------------------- | |
| def load_compositional_distance_results(cfg: dict, repo_root: Path) -> pd.DataFrame: | |
| """ | |
| Return DataFrame with columns: | |
| model, display_name, prompt_strategy, grid_size, question_idx, accuracy, delta | |
| """ | |
| task = cfg["compositional_distance"] | |
| base = repo_root / task["output_base"] | |
| rows = [] | |
| for model_id, model_meta in cfg["models"].items(): | |
| display = model_meta["display_name"] | |
| for strat, strat_cfg in task["prompt_strategies"].items(): | |
| tag = f"orthogonal_{task['corner_pattern']}_{strat}" | |
| subdir = ( | |
| base | |
| / model_id.replace(".", "_").replace("-", "_") | |
| / tag | |
| ) | |
| if not subdir.exists(): | |
| continue | |
| # Prefer summary_stats.json | |
| stats_file = subdir / "summary_stats.json" | |
| if stats_file.exists(): | |
| try: | |
| with open(stats_file) as f: | |
| data = json.load(f) | |
| _parse_task3_stats(data, rows, model_id, display, strat) | |
| continue | |
| except Exception: | |
| pass | |
| # Fall back to results.csv | |
| for csv_file in sorted(subdir.glob("*.csv")): | |
| try: | |
| df = pd.read_csv(csv_file) | |
| if "grid_size" not in df.columns: | |
| continue | |
| q_col = next( | |
| (c for c in ("question_idx", "question_index") if c in df.columns), | |
| None, | |
| ) | |
| for gs, gdf in df.groupby("grid_size"): | |
| if q_col: | |
| q_accs = {} | |
| for qi, qdf in gdf.groupby(q_col): | |
| acc = _df_accuracy(qdf) | |
| q_accs[int(qi)] = acc | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "prompt_strategy": strat, | |
| "grid_size": int(gs), | |
| "question_idx": int(qi), | |
| "accuracy": acc, | |
| "delta": float("nan"), | |
| }) | |
| # Compute delta for Q2 vs avg(Q0, Q1) | |
| if 0 in q_accs and 1 in q_accs and 2 in q_accs: | |
| delta = q_accs[2] - (q_accs[0] + q_accs[1]) / 2 | |
| for r in rows: | |
| if (r["model"] == model_id and | |
| r["prompt_strategy"] == strat and | |
| r["grid_size"] == int(gs) and | |
| r["question_idx"] == 2): | |
| r["delta"] = round(delta, 4) | |
| except Exception: | |
| pass | |
| if not rows: | |
| return pd.DataFrame(columns=[ | |
| "model", "display_name", "prompt_strategy", | |
| "grid_size", "question_idx", "accuracy", "delta" | |
| ]) | |
| return pd.DataFrame(rows) | |
| def _parse_task3_stats(data: dict, rows: list, model_id, display, strat): | |
| """Parse summary_stats.json for task3.""" | |
| try: | |
| by_q = data.get("accuracy_by_question", data.get("per_question", {})) | |
| by_gs = data.get("accuracy_by_grid_size", {}) | |
| for gs_key, gs_data in by_gs.items(): | |
| try: | |
| gs = int(str(gs_key).replace("x", "").split("_")[0]) | |
| except ValueError: | |
| continue | |
| if isinstance(gs_data, dict): | |
| q_accs = {} | |
| for qi_key, acc in gs_data.items(): | |
| try: | |
| qi = int(qi_key) | |
| q_accs[qi] = float(acc) | |
| rows.append({ | |
| "model": model_id, "display_name": display, | |
| "prompt_strategy": strat, | |
| "grid_size": gs, "question_idx": qi, | |
| "accuracy": float(acc), "delta": float("nan"), | |
| }) | |
| except (ValueError, TypeError): | |
| pass | |
| if 0 in q_accs and 1 in q_accs and 2 in q_accs: | |
| delta = q_accs[2] - (q_accs[0] + q_accs[1]) / 2 | |
| for r in rows: | |
| if (r["model"] == model_id and r["prompt_strategy"] == strat | |
| and r["grid_size"] == gs and r["question_idx"] == 2): | |
| r["delta"] = round(delta, 4) | |
| except Exception: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # Leaderboard aggregators | |
| # --------------------------------------------------------------------------- | |
| def maze_navigation_leaderboard(df: pd.DataFrame, k_shot: int = 0) -> pd.DataFrame: | |
| """ | |
| Pivot Task 1 results into a leaderboard table. | |
| Rows = models, columns = (format × strategy), values = accuracy at k_shot. | |
| """ | |
| if df.empty: | |
| return pd.DataFrame() | |
| sub = df[df["k_shot"] == k_shot] | |
| if sub.empty: | |
| return pd.DataFrame() | |
| pivot = sub.pivot_table( | |
| index=["display_name"], | |
| columns=["input_format", "prompt_strategy"], | |
| values="accuracy", | |
| aggfunc="mean", | |
| ) | |
| pivot.columns = [f"{fmt}_{strat}" for fmt, strat in pivot.columns] | |
| pivot = pivot.reset_index().rename(columns={"display_name": "Model"}) | |
| return pivot.round(3) | |
| def point_reuse_leaderboard(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Task 2 leaderboard: per-model accuracy at Q0 and Q3 across all grid sizes. | |
| Highlights Q3 vs Q0 consistency. | |
| """ | |
| if df.empty: | |
| return pd.DataFrame() | |
| q0 = df[df["question_idx"] == 0].groupby("display_name")["accuracy"].mean().rename("Q0 acc") | |
| q3 = df[df["question_idx"] == 3].groupby("display_name")["accuracy"].mean().rename("Q3 acc") | |
| out = pd.concat([q0, q3], axis=1).reset_index().rename(columns={"display_name": "Model"}) | |
| out["Q3-Q0 diff"] = (out["Q3 acc"] - out["Q0 acc"]).round(3) | |
| return out.round(3) | |
| def compositional_distance_leaderboard(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Task 3 leaderboard: per-model Q0/Q1/Q2 accuracy + delta (Q2 vs avg Q0/Q1). | |
| """ | |
| if df.empty: | |
| return pd.DataFrame() | |
| rows = [] | |
| for model, mdf in df.groupby("display_name"): | |
| q0 = mdf[mdf["question_idx"] == 0]["accuracy"].mean() | |
| q1 = mdf[mdf["question_idx"] == 1]["accuracy"].mean() | |
| q2 = mdf[mdf["question_idx"] == 2]["accuracy"].mean() | |
| delta = q2 - (q0 + q1) / 2 if not (pd.isna(q0) or pd.isna(q1) or pd.isna(q2)) else float("nan") | |
| rows.append({ | |
| "Model": model, | |
| "Q0 (A→M)": round(q0, 3), | |
| "Q1 (D→M)": round(q1, 3), | |
| "Q2 (B→C)": round(q2, 3), | |
| "Δ Q2 vs avg(Q0,Q1)": round(delta, 3), | |
| }) | |
| return pd.DataFrame(rows) | |
| # --------------------------------------------------------------------------- | |
| # Full results loader (called by app.py) | |
| # --------------------------------------------------------------------------- | |
| def load_all_results(config_path: str | Path) -> dict[str, pd.DataFrame]: | |
| """Load results for all three tasks. Returns dict of DataFrames.""" | |
| cfg = load_config(config_path) | |
| repo_root = Path(config_path).parent.parent.parent # pipeline/configs/.. → llm-maze-solver | |
| return { | |
| "maze_navigation": load_maze_navigation_results(cfg, repo_root), | |
| "point_reuse": load_point_reuse_results(cfg, repo_root), | |
| "compositional_distance": load_compositional_distance_results(cfg, repo_root), | |
| } | |