SpatialBench / pipeline /results_loader.py
weijiang99's picture
Upload folder using huggingface_hub
cffeecf verified
"""
results_loader.py
-----------------
Scans experiment output directories and assembles results into pandas
DataFrames ready for display in the Gradio leaderboard.
Output directory conventions (from experiments.yaml):
Task 1: <output_base>/<model>/<fmt>_input_<strat>/
→ results_{grid_size}x{grid_size}_k{k}.csv OR summary.json
Task 2: <output_base>/<model>/point_reuse_q3q0_<strat>/
→ proximity_comparison_results.csv
Task 3: <output_base>/<model>/orthogonal_corners_to_center_<strat>/
→ results.csv OR summary_stats.json
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import pandas as pd
import yaml
# ---------------------------------------------------------------------------
# Config helpers
# ---------------------------------------------------------------------------
def load_config(config_path: str | Path) -> dict:
with open(config_path) as f:
return yaml.safe_load(f)
def _model_display(cfg: dict, model_id: str) -> str:
return cfg["models"].get(model_id, {}).get("display_name", model_id)
# ---------------------------------------------------------------------------
# Task 1 results
# ---------------------------------------------------------------------------
def load_maze_navigation_results(cfg: dict, repo_root: Path) -> pd.DataFrame:
"""
Scan Task 1 output dirs and return a DataFrame with columns:
model, display_name, input_format, prompt_strategy, grid_size, k_shot, accuracy
"""
task = cfg["maze_navigation"]
base = repo_root / task["output_base"]
rows = []
for model_id, model_meta in cfg["models"].items():
display = model_meta["display_name"]
for fmt in task["input_formats"]:
for strat in task["prompt_strategies"]:
subdir = base / model_id.replace(".", "_").replace("-", "_") / f"{fmt}_input_{strat}"
if not subdir.exists():
continue
# Look for summary JSON first, then CSVs
summary_file = subdir / "summary.json"
if summary_file.exists():
_parse_task1_summary(summary_file, rows, model_id, display, fmt, strat)
else:
# Fall back to per-grid CSVs
for csv_file in sorted(subdir.glob("*.csv")):
_parse_task1_csv(csv_file, rows, model_id, display, fmt, strat)
if not rows:
return pd.DataFrame(columns=[
"model", "display_name", "input_format", "prompt_strategy",
"grid_size", "k_shot", "accuracy"
])
return pd.DataFrame(rows)
def _parse_task1_summary(path: Path, rows: list, model_id, display, fmt, strat):
try:
with open(path) as f:
data = json.load(f)
# Expected: {grid_size: {k: accuracy, ...}, ...}
for grid_key, k_dict in data.items():
try:
grid_size = int(str(grid_key).replace("x", "").split("_")[0])
except ValueError:
continue
if isinstance(k_dict, dict):
for k, acc in k_dict.items():
rows.append({
"model": model_id, "display_name": display,
"input_format": fmt, "prompt_strategy": strat,
"grid_size": grid_size, "k_shot": int(k),
"accuracy": float(acc),
})
elif isinstance(k_dict, (int, float)):
rows.append({
"model": model_id, "display_name": display,
"input_format": fmt, "prompt_strategy": strat,
"grid_size": grid_size, "k_shot": 0,
"accuracy": float(k_dict),
})
except Exception:
pass
def _parse_task1_csv(path: Path, rows: list, model_id, display, fmt, strat):
try:
df = pd.read_csv(path)
# Detect grid_size and k_shot from filename or columns
grid_size = None
k_shot = 0
name = path.stem
for part in name.split("_"):
if part.startswith("k") and part[1:].isdigit():
k_shot = int(part[1:])
if "x" in part:
try:
g = int(part.split("x")[0])
grid_size = g
except ValueError:
pass
if "grid_size" in df.columns:
for gs, gdf in df.groupby("grid_size"):
acc = _df_accuracy(gdf)
rows.append({
"model": model_id, "display_name": display,
"input_format": fmt, "prompt_strategy": strat,
"grid_size": int(gs), "k_shot": k_shot,
"accuracy": acc,
})
elif grid_size is not None:
rows.append({
"model": model_id, "display_name": display,
"input_format": fmt, "prompt_strategy": strat,
"grid_size": grid_size, "k_shot": k_shot,
"accuracy": _df_accuracy(df),
})
except Exception:
pass
def _df_accuracy(df: pd.DataFrame) -> float:
for col in ("is_correct", "exact_match", "correct", "accuracy"):
if col in df.columns:
return float(df[col].mean())
return float("nan")
# ---------------------------------------------------------------------------
# Task 2 results
# ---------------------------------------------------------------------------
def load_point_reuse_results(cfg: dict, repo_root: Path) -> pd.DataFrame:
"""
Return DataFrame with columns:
model, display_name, prompt_strategy, grid_size, question_idx, accuracy
"""
task = cfg["point_reuse"]
base = repo_root / task["output_base"]
rows = []
for model_id, model_meta in cfg["models"].items():
display = model_meta["display_name"]
for strat, strat_cfg in task["prompt_strategies"].items():
subdir = (
base
/ model_id.replace(".", "_").replace("-", "_")
/ f"point_reuse_q3q0_{strat}"
)
if not subdir.exists():
# Also try the pattern used by existing scripts
subdir = base / model_id / f"proximity_comparison_point_reuse_last_first_same_{strat_cfg['prompt_type']}"
if not subdir.exists():
continue
csv_files = list(subdir.glob("*.csv"))
for csv_file in csv_files:
try:
df = pd.read_csv(csv_file)
if "grid_size" not in df.columns:
continue
q_col = next(
(c for c in ("question_idx", "question_index", "q_idx") if c in df.columns),
None,
)
for gs, gdf in df.groupby("grid_size"):
if q_col:
for qi, qdf in gdf.groupby(q_col):
rows.append({
"model": model_id, "display_name": display,
"prompt_strategy": strat,
"grid_size": int(gs),
"question_idx": int(qi),
"accuracy": _df_accuracy(qdf),
})
else:
rows.append({
"model": model_id, "display_name": display,
"prompt_strategy": strat,
"grid_size": int(gs),
"question_idx": -1,
"accuracy": _df_accuracy(gdf),
})
except Exception:
pass
if not rows:
return pd.DataFrame(columns=[
"model", "display_name", "prompt_strategy",
"grid_size", "question_idx", "accuracy"
])
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Task 3 results
# ---------------------------------------------------------------------------
def load_compositional_distance_results(cfg: dict, repo_root: Path) -> pd.DataFrame:
"""
Return DataFrame with columns:
model, display_name, prompt_strategy, grid_size, question_idx, accuracy, delta
"""
task = cfg["compositional_distance"]
base = repo_root / task["output_base"]
rows = []
for model_id, model_meta in cfg["models"].items():
display = model_meta["display_name"]
for strat, strat_cfg in task["prompt_strategies"].items():
tag = f"orthogonal_{task['corner_pattern']}_{strat}"
subdir = (
base
/ model_id.replace(".", "_").replace("-", "_")
/ tag
)
if not subdir.exists():
continue
# Prefer summary_stats.json
stats_file = subdir / "summary_stats.json"
if stats_file.exists():
try:
with open(stats_file) as f:
data = json.load(f)
_parse_task3_stats(data, rows, model_id, display, strat)
continue
except Exception:
pass
# Fall back to results.csv
for csv_file in sorted(subdir.glob("*.csv")):
try:
df = pd.read_csv(csv_file)
if "grid_size" not in df.columns:
continue
q_col = next(
(c for c in ("question_idx", "question_index") if c in df.columns),
None,
)
for gs, gdf in df.groupby("grid_size"):
if q_col:
q_accs = {}
for qi, qdf in gdf.groupby(q_col):
acc = _df_accuracy(qdf)
q_accs[int(qi)] = acc
rows.append({
"model": model_id, "display_name": display,
"prompt_strategy": strat,
"grid_size": int(gs),
"question_idx": int(qi),
"accuracy": acc,
"delta": float("nan"),
})
# Compute delta for Q2 vs avg(Q0, Q1)
if 0 in q_accs and 1 in q_accs and 2 in q_accs:
delta = q_accs[2] - (q_accs[0] + q_accs[1]) / 2
for r in rows:
if (r["model"] == model_id and
r["prompt_strategy"] == strat and
r["grid_size"] == int(gs) and
r["question_idx"] == 2):
r["delta"] = round(delta, 4)
except Exception:
pass
if not rows:
return pd.DataFrame(columns=[
"model", "display_name", "prompt_strategy",
"grid_size", "question_idx", "accuracy", "delta"
])
return pd.DataFrame(rows)
def _parse_task3_stats(data: dict, rows: list, model_id, display, strat):
"""Parse summary_stats.json for task3."""
try:
by_q = data.get("accuracy_by_question", data.get("per_question", {}))
by_gs = data.get("accuracy_by_grid_size", {})
for gs_key, gs_data in by_gs.items():
try:
gs = int(str(gs_key).replace("x", "").split("_")[0])
except ValueError:
continue
if isinstance(gs_data, dict):
q_accs = {}
for qi_key, acc in gs_data.items():
try:
qi = int(qi_key)
q_accs[qi] = float(acc)
rows.append({
"model": model_id, "display_name": display,
"prompt_strategy": strat,
"grid_size": gs, "question_idx": qi,
"accuracy": float(acc), "delta": float("nan"),
})
except (ValueError, TypeError):
pass
if 0 in q_accs and 1 in q_accs and 2 in q_accs:
delta = q_accs[2] - (q_accs[0] + q_accs[1]) / 2
for r in rows:
if (r["model"] == model_id and r["prompt_strategy"] == strat
and r["grid_size"] == gs and r["question_idx"] == 2):
r["delta"] = round(delta, 4)
except Exception:
pass
# ---------------------------------------------------------------------------
# Leaderboard aggregators
# ---------------------------------------------------------------------------
def maze_navigation_leaderboard(df: pd.DataFrame, k_shot: int = 0) -> pd.DataFrame:
"""
Pivot Task 1 results into a leaderboard table.
Rows = models, columns = (format × strategy), values = accuracy at k_shot.
"""
if df.empty:
return pd.DataFrame()
sub = df[df["k_shot"] == k_shot]
if sub.empty:
return pd.DataFrame()
pivot = sub.pivot_table(
index=["display_name"],
columns=["input_format", "prompt_strategy"],
values="accuracy",
aggfunc="mean",
)
pivot.columns = [f"{fmt}_{strat}" for fmt, strat in pivot.columns]
pivot = pivot.reset_index().rename(columns={"display_name": "Model"})
return pivot.round(3)
def point_reuse_leaderboard(df: pd.DataFrame) -> pd.DataFrame:
"""
Task 2 leaderboard: per-model accuracy at Q0 and Q3 across all grid sizes.
Highlights Q3 vs Q0 consistency.
"""
if df.empty:
return pd.DataFrame()
q0 = df[df["question_idx"] == 0].groupby("display_name")["accuracy"].mean().rename("Q0 acc")
q3 = df[df["question_idx"] == 3].groupby("display_name")["accuracy"].mean().rename("Q3 acc")
out = pd.concat([q0, q3], axis=1).reset_index().rename(columns={"display_name": "Model"})
out["Q3-Q0 diff"] = (out["Q3 acc"] - out["Q0 acc"]).round(3)
return out.round(3)
def compositional_distance_leaderboard(df: pd.DataFrame) -> pd.DataFrame:
"""
Task 3 leaderboard: per-model Q0/Q1/Q2 accuracy + delta (Q2 vs avg Q0/Q1).
"""
if df.empty:
return pd.DataFrame()
rows = []
for model, mdf in df.groupby("display_name"):
q0 = mdf[mdf["question_idx"] == 0]["accuracy"].mean()
q1 = mdf[mdf["question_idx"] == 1]["accuracy"].mean()
q2 = mdf[mdf["question_idx"] == 2]["accuracy"].mean()
delta = q2 - (q0 + q1) / 2 if not (pd.isna(q0) or pd.isna(q1) or pd.isna(q2)) else float("nan")
rows.append({
"Model": model,
"Q0 (A→M)": round(q0, 3),
"Q1 (D→M)": round(q1, 3),
"Q2 (B→C)": round(q2, 3),
"Δ Q2 vs avg(Q0,Q1)": round(delta, 3),
})
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Full results loader (called by app.py)
# ---------------------------------------------------------------------------
def load_all_results(config_path: str | Path) -> dict[str, pd.DataFrame]:
"""Load results for all three tasks. Returns dict of DataFrames."""
cfg = load_config(config_path)
repo_root = Path(config_path).parent.parent.parent # pipeline/configs/.. → llm-maze-solver
return {
"maze_navigation": load_maze_navigation_results(cfg, repo_root),
"point_reuse": load_point_reuse_results(cfg, repo_root),
"compositional_distance": load_compositional_distance_results(cfg, repo_root),
}