"""Build viz data for a single sweep + register it under a benchmark. A **benchmark** (e.g. Data-Agent Bench, DABstep Bench) is a curated collection of tasks plus metadata about where the tasks came from. A **sweep** is one run of N models × M harnesses × k attempts over the benchmark's tasks. This script does three things per invocation: 1. Generate `/summary.json` — heatmap + per-attempt metadata for one sweep. Tasks in the summary are enriched with `gold_answer`, `question`, `reward_mode`, normalized `difficulty_level`, via the benchmark's PROFILE. 2. Generate `/traces.json` — one big dict of per-attempt trajectories, synced separately to a bucket. 3. Update `/../benchmarks.json` — the site-wide index that the viewer reads to render the benchmark toggle + info icon. Output layout (`viz_server/site/` by default): site/ ├── benchmarks.json ← registry: benchmark → [sweeps] ├── viewer.html ├── v1/{summary,traces}.json ← sweep under data-agent-bench └── dabstep/{summary,traces}.json ← sweep under dabstep-bench Usage: python viz_server/build_data.py \\ --name dabstep \\ --suite rl/harbor/tasks/dabstep-workdir \\ --benchmark dabstep-bench \\ --out viz_server/site/dabstep """ from __future__ import annotations import argparse import json import re import sys import tomllib from pathlib import Path # This lives at /viz_server/; the data-builder it reuses lives in /rl. _RL = Path(__file__).resolve().parents[1] / "rl" sys.path.insert(0, str(_RL)) from scripts.eval_sweep_viz import ( # noqa: E402 build_data, HTML_TEMPLATE, SWEEP_BASE, VERIFIED_SUITE, JOBS_DIR, ) SITE_DIR = Path(__file__).resolve().parent / "site" # Heavy per-attempt fields moved out of summary into traces.json. _HEAVY = ("trajectory", "grader_text", "opencode_log", "trial_log_tail") # ───────────────────────────────────────────────────────────────────────────── # Benchmark profiles — declarative metadata extractors per task layout # ───────────────────────────────────────────────────────────────────────────── BENCHMARKS = { "data-agent-bench": { "label": "Data-Agent Bench", "description": "Verified data-analysis tasks over Kaggle datasets. " "Each task is a (question, gold-answer) pair tested with a " "mode-aware grader (exact / numeric / flexible / LLM-judge).", "source": { "harbor": "AdithyaSK/data_agent_rl_environment_eval-harbor", "hf_dataset": "AdithyaSK/data_agent_rl_environment_eval", }, # Profile: how to extract task metadata from this benchmark's task.toml layout. "profile": { "difficulty_level": ("metadata.toml", "difficulty_level"), # already int 0-4 "gold_answer": ("metadata.toml", "gold_answer"), "kaggle": ("metadata.toml", "kaggle_dataset_name"), "reward_mode": ("metadata.toml", "reward_mode_initial"), "question": ("task.toml", "description"), }, }, "dabstep-bench": { "label": "DABstep Bench", "description": "Data Agent Benchmark for Multi-step Reasoning — 450 financial-" "analytics factoid tasks over a shared Adyen-payments corpus. " "Released by Adyen + Hugging Face (arXiv:2506.23719).", "source": { "harbor": "AdithyaSK/dabstep-harbor", "hf_dataset": "adyen/DABstep", "paper": "https://arxiv.org/abs/2506.23719", "leaderboard":"https://huggingface.co/spaces/adyen/DABstep", }, "profile": { # DABstep stores `difficulty = "easy"|"hard"`; we also injected # `difficulty_level` (0|4) during the /workdir rewrite, prefer that. "difficulty_level": ("metadata.toml", "difficulty_level"), "difficulty_map": {"easy": 0, "hard": 4}, "difficulty_str": ("metadata.toml", "difficulty"), # Gold lives in solution/solve.sh — extract via regex below. "gold_answer_file": "solution/solve.sh", "question_file": "instruction.md", # Static for DABstep — uses its own fuzzy scorer. "reward_mode_const": "dabstep-fuzzy", }, }, } # ───────────────────────────────────────────────────────────────────────────── # Metadata enrichment # ───────────────────────────────────────────────────────────────────────────── _DABSTEP_GOLD_RE = re.compile( r"cat\s*<<['\"]?ANSWER_EOF['\"]?\s*>\s*/workdir/answer\.txt\s*\n(.*?)\nANSWER_EOF", re.DOTALL, ) # Pulls the value out of `solution/solve.sh` style: # cat <<'ANSWER_EOF' > /workdir/answer.txt # # ANSWER_EOF def _read_solve_gold(path: Path) -> str: """Extract the gold answer string from a solve.sh heredoc.""" if not path.exists(): return "" m = _DABSTEP_GOLD_RE.search(path.read_text()) return m.group(1).strip() if m else "" def _read_instruction(path: Path, max_chars: int = 4000) -> str: """Read instruction.md and trim — UI shows it in a side panel, so keep it sane.""" if not path.exists(): return "" txt = path.read_text().strip() return txt if len(txt) <= max_chars else txt[:max_chars] + "\n…[truncated]" def enrich_tasks(data: dict, suite: Path, benchmark_key: str) -> None: """Mutate `data['tasks']` to add gold/question/difficulty per the benchmark profile. Idempotent — only fills fields that are empty / wrong. The base `build_data` output already has `difficulty_level`, `question`, `answer`, `kaggle`, `reward_mode` populated from task.toml's [metadata]; this is for benchmarks whose task layout doesn't carry that metadata in [metadata] directly. """ profile = BENCHMARKS.get(benchmark_key, {}).get("profile", {}) if not profile: return # no enrichment configured for this benchmark; keep base output as-is tasks_dir = suite for task in data.get("tasks", []): task_dir = tasks_dir / task["id"] # Difficulty: prefer existing int; otherwise map from string field via profile. if not task.get("difficulty_level"): tt = task_dir / "task.toml" if tt.exists(): tomld = tomllib.loads(tt.read_text()) meta = tomld.get("metadata", {}) if "difficulty_level" in meta: task["difficulty_level"] = int(meta["difficulty_level"]) elif "difficulty" in meta and "difficulty_map" in profile: task["difficulty_level"] = profile["difficulty_map"].get( meta["difficulty"], 0 ) # Gold answer: parse solution/solve.sh if profile points there if not task.get("answer") and profile.get("gold_answer_file"): task["answer"] = _read_solve_gold(task_dir / profile["gold_answer_file"]) # Question: read instruction.md if profile points there if not task.get("question") and profile.get("question_file"): task["question"] = _read_instruction(task_dir / profile["question_file"]) # Reward mode: static label if profile says so if not task.get("reward_mode") and profile.get("reward_mode_const"): task["reward_mode"] = profile["reward_mode_const"] # Kaggle: leave blank if benchmark has no Kaggle dependency (UI shows "—") # ───────────────────────────────────────────────────────────────────────────── # Split heavy → traces.json # ───────────────────────────────────────────────────────────────────────────── def split(data: dict, out: Path) -> dict: """Strip heavy fields into traces.json (keyed by tid); return slim summary.""" traces: dict[str, dict] = {} tid = 0 for task in data.get("tasks", []): for cell in task.get("cells", {}).values(): for att in cell.get("attempts", []): traces[str(tid)] = {k: att.pop(k, None) for k in _HEAVY} att["tid"] = tid tid += 1 (out / "traces.json").write_text(json.dumps(traces, separators=(",", ":"), default=str)) return data # ───────────────────────────────────────────────────────────────────────────── # benchmarks.json registry — merged into existing on every run # ───────────────────────────────────────────────────────────────────────────── def update_benchmarks_registry(site_dir: Path, benchmark_key: str, sweep_key: str) -> None: """Ensure benchmarks.json exists at site_dir/, has the benchmark, and registers the sweep.""" reg_path = site_dir / "benchmarks.json" reg: dict = {} if reg_path.exists(): try: reg = json.loads(reg_path.read_text()) except json.JSONDecodeError: reg = {} bench = BENCHMARKS.get(benchmark_key) if not bench: # Unknown benchmark — write a minimal entry so the toggle still works. bench = {"label": benchmark_key, "description": "", "source": {}} entry = reg.setdefault(benchmark_key, { "label": bench["label"], "description": bench["description"], "source": bench["source"], "sweeps": [], }) # Always refresh labels/descriptions from current BENCHMARKS dict (single source of truth). entry["label"] = bench["label"] entry["description"] = bench["description"] entry["source"] = bench["source"] if sweep_key not in entry["sweeps"]: entry["sweeps"].append(sweep_key) entry["sweeps"].sort() reg_path.write_text(json.dumps(reg, indent=2)) # ───────────────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────────────── def main() -> None: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--name", required=True, help="Sweep folder under data/eval_sweep/, also used as the sweep key.") ap.add_argument("--benchmark", required=True, choices=sorted(BENCHMARKS.keys()), help="Which benchmark this sweep belongs to.") ap.add_argument("--suite", required=True, help="Path to the Harbor task suite (where task.toml/instruction.md/etc live).") ap.add_argument("--sweep-dir", default=None, help="Override the sweep dir (default data/eval_sweep/).") ap.add_argument("--jobs-dir", default=str(JOBS_DIR)) ap.add_argument("--k-max", type=int, default=4, dest="k_max") ap.add_argument("--out", default=None, help="Output dir (default viz_server/site/).") args = ap.parse_args() sweep_dir = Path(args.sweep_dir) if args.sweep_dir else (SWEEP_BASE / args.name) out = Path(args.out) if args.out else (SITE_DIR / args.name) out.mkdir(parents=True, exist_ok=True) suite = Path(args.suite).resolve() # The app reads sweeps by their site// name, not by --name. Use that as # the canonical sweep key so the registry stays consistent with the layout. sweep_key = out.name print(f"[build] benchmark={args.benchmark} sweep_key={sweep_key} (raw name={args.name})") print(f" sweep_dir={sweep_dir}") print(f" suite={suite}") data = build_data(sweep_dir.resolve(), Path(args.jobs_dir).resolve(), suite, args.k_max) enrich_tasks(data, suite, args.benchmark) summary = split(data, out) (out / "summary.json").write_text(json.dumps(summary, separators=(",", ":"), default=str)) # Update site-wide benchmarks.json registry (key by site dir, not --name). update_benchmarks_registry(out.parent, args.benchmark, sweep_key) sz = (out / "summary.json").stat().st_size / 1024 / 1024 tz = (out / "traces.json").stat().st_size / 1024 / 1024 print(f" summary.json: {sz:.1f} MB") print(f" traces.json: {tz:.1f} MB") print(f" benchmarks.json updated → {out.parent / 'benchmarks.json'}") if __name__ == "__main__": main()