""" results_loader.py ----------------- Scans experiment output directories and assembles results into pandas DataFrames ready for display in the Gradio leaderboard. Output directory conventions (from experiments.yaml): Task 1: //_input_/ → results_{grid_size}x{grid_size}_k{k}.csv OR summary.json Task 2: //point_reuse_q3q0_/ → proximity_comparison_results.csv Task 3: //orthogonal_corners_to_center_/ → results.csv OR summary_stats.json """ from __future__ import annotations import json import os from pathlib import Path import pandas as pd import yaml # --------------------------------------------------------------------------- # Config helpers # --------------------------------------------------------------------------- def load_config(config_path: str | Path) -> dict: with open(config_path) as f: return yaml.safe_load(f) def _model_display(cfg: dict, model_id: str) -> str: return cfg["models"].get(model_id, {}).get("display_name", model_id) # --------------------------------------------------------------------------- # Task 1 results # --------------------------------------------------------------------------- def load_maze_navigation_results(cfg: dict, repo_root: Path) -> pd.DataFrame: """ Scan Task 1 output dirs and return a DataFrame with columns: model, display_name, input_format, prompt_strategy, grid_size, k_shot, accuracy """ task = cfg["maze_navigation"] base = repo_root / task["output_base"] rows = [] for model_id, model_meta in cfg["models"].items(): display = model_meta["display_name"] for fmt in task["input_formats"]: for strat in task["prompt_strategies"]: subdir = base / model_id.replace(".", "_").replace("-", "_") / f"{fmt}_input_{strat}" if not subdir.exists(): continue # Look for summary JSON first, then CSVs summary_file = subdir / "summary.json" if summary_file.exists(): _parse_task1_summary(summary_file, rows, model_id, display, fmt, strat) else: # Fall back to per-grid CSVs for csv_file in sorted(subdir.glob("*.csv")): _parse_task1_csv(csv_file, rows, model_id, display, fmt, strat) if not rows: return pd.DataFrame(columns=[ "model", "display_name", "input_format", "prompt_strategy", "grid_size", "k_shot", "accuracy" ]) return pd.DataFrame(rows) def _parse_task1_summary(path: Path, rows: list, model_id, display, fmt, strat): try: with open(path) as f: data = json.load(f) # Expected: {grid_size: {k: accuracy, ...}, ...} for grid_key, k_dict in data.items(): try: grid_size = int(str(grid_key).replace("x", "").split("_")[0]) except ValueError: continue if isinstance(k_dict, dict): for k, acc in k_dict.items(): rows.append({ "model": model_id, "display_name": display, "input_format": fmt, "prompt_strategy": strat, "grid_size": grid_size, "k_shot": int(k), "accuracy": float(acc), }) elif isinstance(k_dict, (int, float)): rows.append({ "model": model_id, "display_name": display, "input_format": fmt, "prompt_strategy": strat, "grid_size": grid_size, "k_shot": 0, "accuracy": float(k_dict), }) except Exception: pass def _parse_task1_csv(path: Path, rows: list, model_id, display, fmt, strat): try: df = pd.read_csv(path) # Detect grid_size and k_shot from filename or columns grid_size = None k_shot = 0 name = path.stem for part in name.split("_"): if part.startswith("k") and part[1:].isdigit(): k_shot = int(part[1:]) if "x" in part: try: g = int(part.split("x")[0]) grid_size = g except ValueError: pass if "grid_size" in df.columns: for gs, gdf in df.groupby("grid_size"): acc = _df_accuracy(gdf) rows.append({ "model": model_id, "display_name": display, "input_format": fmt, "prompt_strategy": strat, "grid_size": int(gs), "k_shot": k_shot, "accuracy": acc, }) elif grid_size is not None: rows.append({ "model": model_id, "display_name": display, "input_format": fmt, "prompt_strategy": strat, "grid_size": grid_size, "k_shot": k_shot, "accuracy": _df_accuracy(df), }) except Exception: pass def _df_accuracy(df: pd.DataFrame) -> float: for col in ("is_correct", "exact_match", "correct", "accuracy"): if col in df.columns: return float(df[col].mean()) return float("nan") # --------------------------------------------------------------------------- # Task 2 results # --------------------------------------------------------------------------- def load_point_reuse_results(cfg: dict, repo_root: Path) -> pd.DataFrame: """ Return DataFrame with columns: model, display_name, prompt_strategy, grid_size, question_idx, accuracy """ task = cfg["point_reuse"] base = repo_root / task["output_base"] rows = [] for model_id, model_meta in cfg["models"].items(): display = model_meta["display_name"] for strat, strat_cfg in task["prompt_strategies"].items(): subdir = ( base / model_id.replace(".", "_").replace("-", "_") / f"point_reuse_q3q0_{strat}" ) if not subdir.exists(): # Also try the pattern used by existing scripts subdir = base / model_id / f"proximity_comparison_point_reuse_last_first_same_{strat_cfg['prompt_type']}" if not subdir.exists(): continue csv_files = list(subdir.glob("*.csv")) for csv_file in csv_files: try: df = pd.read_csv(csv_file) if "grid_size" not in df.columns: continue q_col = next( (c for c in ("question_idx", "question_index", "q_idx") if c in df.columns), None, ) for gs, gdf in df.groupby("grid_size"): if q_col: for qi, qdf in gdf.groupby(q_col): rows.append({ "model": model_id, "display_name": display, "prompt_strategy": strat, "grid_size": int(gs), "question_idx": int(qi), "accuracy": _df_accuracy(qdf), }) else: rows.append({ "model": model_id, "display_name": display, "prompt_strategy": strat, "grid_size": int(gs), "question_idx": -1, "accuracy": _df_accuracy(gdf), }) except Exception: pass if not rows: return pd.DataFrame(columns=[ "model", "display_name", "prompt_strategy", "grid_size", "question_idx", "accuracy" ]) return pd.DataFrame(rows) # --------------------------------------------------------------------------- # Task 3 results # --------------------------------------------------------------------------- def load_compositional_distance_results(cfg: dict, repo_root: Path) -> pd.DataFrame: """ Return DataFrame with columns: model, display_name, prompt_strategy, grid_size, question_idx, accuracy, delta """ task = cfg["compositional_distance"] base = repo_root / task["output_base"] rows = [] for model_id, model_meta in cfg["models"].items(): display = model_meta["display_name"] for strat, strat_cfg in task["prompt_strategies"].items(): tag = f"orthogonal_{task['corner_pattern']}_{strat}" subdir = ( base / model_id.replace(".", "_").replace("-", "_") / tag ) if not subdir.exists(): continue # Prefer summary_stats.json stats_file = subdir / "summary_stats.json" if stats_file.exists(): try: with open(stats_file) as f: data = json.load(f) _parse_task3_stats(data, rows, model_id, display, strat) continue except Exception: pass # Fall back to results.csv for csv_file in sorted(subdir.glob("*.csv")): try: df = pd.read_csv(csv_file) if "grid_size" not in df.columns: continue q_col = next( (c for c in ("question_idx", "question_index") if c in df.columns), None, ) for gs, gdf in df.groupby("grid_size"): if q_col: q_accs = {} for qi, qdf in gdf.groupby(q_col): acc = _df_accuracy(qdf) q_accs[int(qi)] = acc rows.append({ "model": model_id, "display_name": display, "prompt_strategy": strat, "grid_size": int(gs), "question_idx": int(qi), "accuracy": acc, "delta": float("nan"), }) # Compute delta for Q2 vs avg(Q0, Q1) if 0 in q_accs and 1 in q_accs and 2 in q_accs: delta = q_accs[2] - (q_accs[0] + q_accs[1]) / 2 for r in rows: if (r["model"] == model_id and r["prompt_strategy"] == strat and r["grid_size"] == int(gs) and r["question_idx"] == 2): r["delta"] = round(delta, 4) except Exception: pass if not rows: return pd.DataFrame(columns=[ "model", "display_name", "prompt_strategy", "grid_size", "question_idx", "accuracy", "delta" ]) return pd.DataFrame(rows) def _parse_task3_stats(data: dict, rows: list, model_id, display, strat): """Parse summary_stats.json for task3.""" try: by_q = data.get("accuracy_by_question", data.get("per_question", {})) by_gs = data.get("accuracy_by_grid_size", {}) for gs_key, gs_data in by_gs.items(): try: gs = int(str(gs_key).replace("x", "").split("_")[0]) except ValueError: continue if isinstance(gs_data, dict): q_accs = {} for qi_key, acc in gs_data.items(): try: qi = int(qi_key) q_accs[qi] = float(acc) rows.append({ "model": model_id, "display_name": display, "prompt_strategy": strat, "grid_size": gs, "question_idx": qi, "accuracy": float(acc), "delta": float("nan"), }) except (ValueError, TypeError): pass if 0 in q_accs and 1 in q_accs and 2 in q_accs: delta = q_accs[2] - (q_accs[0] + q_accs[1]) / 2 for r in rows: if (r["model"] == model_id and r["prompt_strategy"] == strat and r["grid_size"] == gs and r["question_idx"] == 2): r["delta"] = round(delta, 4) except Exception: pass # --------------------------------------------------------------------------- # Leaderboard aggregators # --------------------------------------------------------------------------- def maze_navigation_leaderboard(df: pd.DataFrame, k_shot: int = 0) -> pd.DataFrame: """ Pivot Task 1 results into a leaderboard table. Rows = models, columns = (format × strategy), values = accuracy at k_shot. """ if df.empty: return pd.DataFrame() sub = df[df["k_shot"] == k_shot] if sub.empty: return pd.DataFrame() pivot = sub.pivot_table( index=["display_name"], columns=["input_format", "prompt_strategy"], values="accuracy", aggfunc="mean", ) pivot.columns = [f"{fmt}_{strat}" for fmt, strat in pivot.columns] pivot = pivot.reset_index().rename(columns={"display_name": "Model"}) return pivot.round(3) def point_reuse_leaderboard(df: pd.DataFrame) -> pd.DataFrame: """ Task 2 leaderboard: per-model accuracy at Q0 and Q3 across all grid sizes. Highlights Q3 vs Q0 consistency. """ if df.empty: return pd.DataFrame() q0 = df[df["question_idx"] == 0].groupby("display_name")["accuracy"].mean().rename("Q0 acc") q3 = df[df["question_idx"] == 3].groupby("display_name")["accuracy"].mean().rename("Q3 acc") out = pd.concat([q0, q3], axis=1).reset_index().rename(columns={"display_name": "Model"}) out["Q3-Q0 diff"] = (out["Q3 acc"] - out["Q0 acc"]).round(3) return out.round(3) def compositional_distance_leaderboard(df: pd.DataFrame) -> pd.DataFrame: """ Task 3 leaderboard: per-model Q0/Q1/Q2 accuracy + delta (Q2 vs avg Q0/Q1). """ if df.empty: return pd.DataFrame() rows = [] for model, mdf in df.groupby("display_name"): q0 = mdf[mdf["question_idx"] == 0]["accuracy"].mean() q1 = mdf[mdf["question_idx"] == 1]["accuracy"].mean() q2 = mdf[mdf["question_idx"] == 2]["accuracy"].mean() delta = q2 - (q0 + q1) / 2 if not (pd.isna(q0) or pd.isna(q1) or pd.isna(q2)) else float("nan") rows.append({ "Model": model, "Q0 (A→M)": round(q0, 3), "Q1 (D→M)": round(q1, 3), "Q2 (B→C)": round(q2, 3), "Δ Q2 vs avg(Q0,Q1)": round(delta, 3), }) return pd.DataFrame(rows) # --------------------------------------------------------------------------- # Full results loader (called by app.py) # --------------------------------------------------------------------------- def load_all_results(config_path: str | Path) -> dict[str, pd.DataFrame]: """Load results for all three tasks. Returns dict of DataFrames.""" cfg = load_config(config_path) repo_root = Path(config_path).parent.parent.parent # pipeline/configs/.. → llm-maze-solver return { "maze_navigation": load_maze_navigation_results(cfg, repo_root), "point_reuse": load_point_reuse_results(cfg, repo_root), "compositional_distance": load_compositional_distance_results(cfg, repo_root), }