DataAgent-evals / build_data.py
AdithyaSK's picture
AdithyaSK HF Staff
add build_data.py β€” data-build helper (run per-sweep to produce site/<sweep>/summary.json + traces.json)
485d935
Raw
History Blame Contribute Delete
13.9 kB
"""Build viz data for a single sweep + register it under a benchmark.
A **benchmark** (e.g. Data-Agent Bench, DABstep Bench) is a curated collection of
tasks plus metadata about where the tasks came from. A **sweep** is one run of N
models Γ— M harnesses Γ— k attempts over the benchmark's tasks.
This script does three things per invocation:
1. Generate `<out>/summary.json` β€” heatmap + per-attempt metadata for one sweep.
Tasks in the summary are enriched with `gold_answer`, `question`,
`reward_mode`, normalized `difficulty_level`, via the benchmark's PROFILE.
2. Generate `<out>/traces.json` β€” one big dict of per-attempt trajectories,
synced separately to a bucket.
3. Update `<out>/../benchmarks.json` β€” the site-wide index that the viewer
reads to render the benchmark toggle + info icon.
Output layout (`viz_server/site/` by default):
site/
β”œβ”€β”€ benchmarks.json ← registry: benchmark β†’ [sweeps]
β”œβ”€β”€ viewer.html
β”œβ”€β”€ v1/{summary,traces}.json ← sweep under data-agent-bench
└── dabstep/{summary,traces}.json ← sweep under dabstep-bench
Usage:
python viz_server/build_data.py \\
--name dabstep \\
--suite rl/harbor/tasks/dabstep-workdir \\
--benchmark dabstep-bench \\
--out viz_server/site/dabstep
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import tomllib
from pathlib import Path
# This lives at <repo>/viz_server/; the data-builder it reuses lives in <repo>/rl.
_RL = Path(__file__).resolve().parents[1] / "rl"
sys.path.insert(0, str(_RL))
from scripts.eval_sweep_viz import ( # noqa: E402
build_data, HTML_TEMPLATE, SWEEP_BASE, VERIFIED_SUITE, JOBS_DIR,
)
SITE_DIR = Path(__file__).resolve().parent / "site"
# Heavy per-attempt fields moved out of summary into traces.json.
_HEAVY = ("trajectory", "grader_text", "opencode_log", "trial_log_tail")
# ─────────────────────────────────────────────────────────────────────────────
# Benchmark profiles β€” declarative metadata extractors per task layout
# ─────────────────────────────────────────────────────────────────────────────
BENCHMARKS = {
"data-agent-bench": {
"label": "Data-Agent Bench",
"description": "Verified data-analysis tasks over Kaggle datasets. "
"Each task is a (question, gold-answer) pair tested with a "
"mode-aware grader (exact / numeric / flexible / LLM-judge).",
"source": {
"harbor": "AdithyaSK/data_agent_rl_environment_eval-harbor",
"hf_dataset": "AdithyaSK/data_agent_rl_environment_eval",
},
# Profile: how to extract task metadata from this benchmark's task.toml layout.
"profile": {
"difficulty_level": ("metadata.toml", "difficulty_level"), # already int 0-4
"gold_answer": ("metadata.toml", "gold_answer"),
"kaggle": ("metadata.toml", "kaggle_dataset_name"),
"reward_mode": ("metadata.toml", "reward_mode_initial"),
"question": ("task.toml", "description"),
},
},
"dabstep-bench": {
"label": "DABstep Bench",
"description": "Data Agent Benchmark for Multi-step Reasoning β€” 450 financial-"
"analytics factoid tasks over a shared Adyen-payments corpus. "
"Released by Adyen + Hugging Face (arXiv:2506.23719).",
"source": {
"harbor": "AdithyaSK/dabstep-harbor",
"hf_dataset": "adyen/DABstep",
"paper": "https://arxiv.org/abs/2506.23719",
"leaderboard":"https://huggingface.co/spaces/adyen/DABstep",
},
"profile": {
# DABstep stores `difficulty = "easy"|"hard"`; we also injected
# `difficulty_level` (0|4) during the /workdir rewrite, prefer that.
"difficulty_level": ("metadata.toml", "difficulty_level"),
"difficulty_map": {"easy": 0, "hard": 4},
"difficulty_str": ("metadata.toml", "difficulty"),
# Gold lives in solution/solve.sh β€” extract via regex below.
"gold_answer_file": "solution/solve.sh",
"question_file": "instruction.md",
# Static for DABstep β€” uses its own fuzzy scorer.
"reward_mode_const": "dabstep-fuzzy",
},
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Metadata enrichment
# ─────────────────────────────────────────────────────────────────────────────
_DABSTEP_GOLD_RE = re.compile(
r"cat\s*<<['\"]?ANSWER_EOF['\"]?\s*>\s*/workdir/answer\.txt\s*\n(.*?)\nANSWER_EOF",
re.DOTALL,
)
# Pulls the value out of `solution/solve.sh` style:
# cat <<'ANSWER_EOF' > /workdir/answer.txt
# <gold>
# ANSWER_EOF
def _read_solve_gold(path: Path) -> str:
"""Extract the gold answer string from a solve.sh heredoc."""
if not path.exists():
return ""
m = _DABSTEP_GOLD_RE.search(path.read_text())
return m.group(1).strip() if m else ""
def _read_instruction(path: Path, max_chars: int = 4000) -> str:
"""Read instruction.md and trim β€” UI shows it in a side panel, so keep it sane."""
if not path.exists():
return ""
txt = path.read_text().strip()
return txt if len(txt) <= max_chars else txt[:max_chars] + "\n…[truncated]"
def enrich_tasks(data: dict, suite: Path, benchmark_key: str) -> None:
"""Mutate `data['tasks']` to add gold/question/difficulty per the benchmark profile.
Idempotent β€” only fills fields that are empty / wrong. The base `build_data`
output already has `difficulty_level`, `question`, `answer`, `kaggle`,
`reward_mode` populated from task.toml's [metadata]; this is for benchmarks
whose task layout doesn't carry that metadata in [metadata] directly.
"""
profile = BENCHMARKS.get(benchmark_key, {}).get("profile", {})
if not profile:
return # no enrichment configured for this benchmark; keep base output as-is
tasks_dir = suite
for task in data.get("tasks", []):
task_dir = tasks_dir / task["id"]
# Difficulty: prefer existing int; otherwise map from string field via profile.
if not task.get("difficulty_level"):
tt = task_dir / "task.toml"
if tt.exists():
tomld = tomllib.loads(tt.read_text())
meta = tomld.get("metadata", {})
if "difficulty_level" in meta:
task["difficulty_level"] = int(meta["difficulty_level"])
elif "difficulty" in meta and "difficulty_map" in profile:
task["difficulty_level"] = profile["difficulty_map"].get(
meta["difficulty"], 0
)
# Gold answer: parse solution/solve.sh if profile points there
if not task.get("answer") and profile.get("gold_answer_file"):
task["answer"] = _read_solve_gold(task_dir / profile["gold_answer_file"])
# Question: read instruction.md if profile points there
if not task.get("question") and profile.get("question_file"):
task["question"] = _read_instruction(task_dir / profile["question_file"])
# Reward mode: static label if profile says so
if not task.get("reward_mode") and profile.get("reward_mode_const"):
task["reward_mode"] = profile["reward_mode_const"]
# Kaggle: leave blank if benchmark has no Kaggle dependency (UI shows "β€”")
# ─────────────────────────────────────────────────────────────────────────────
# Split heavy β†’ traces.json
# ─────────────────────────────────────────────────────────────────────────────
def split(data: dict, out: Path) -> dict:
"""Strip heavy fields into traces.json (keyed by tid); return slim summary."""
traces: dict[str, dict] = {}
tid = 0
for task in data.get("tasks", []):
for cell in task.get("cells", {}).values():
for att in cell.get("attempts", []):
traces[str(tid)] = {k: att.pop(k, None) for k in _HEAVY}
att["tid"] = tid
tid += 1
(out / "traces.json").write_text(json.dumps(traces, separators=(",", ":"), default=str))
return data
# ─────────────────────────────────────────────────────────────────────────────
# benchmarks.json registry β€” merged into existing on every run
# ─────────────────────────────────────────────────────────────────────────────
def update_benchmarks_registry(site_dir: Path, benchmark_key: str, sweep_key: str) -> None:
"""Ensure benchmarks.json exists at site_dir/, has the benchmark, and registers the sweep."""
reg_path = site_dir / "benchmarks.json"
reg: dict = {}
if reg_path.exists():
try:
reg = json.loads(reg_path.read_text())
except json.JSONDecodeError:
reg = {}
bench = BENCHMARKS.get(benchmark_key)
if not bench:
# Unknown benchmark β€” write a minimal entry so the toggle still works.
bench = {"label": benchmark_key, "description": "", "source": {}}
entry = reg.setdefault(benchmark_key, {
"label": bench["label"],
"description": bench["description"],
"source": bench["source"],
"sweeps": [],
})
# Always refresh labels/descriptions from current BENCHMARKS dict (single source of truth).
entry["label"] = bench["label"]
entry["description"] = bench["description"]
entry["source"] = bench["source"]
if sweep_key not in entry["sweeps"]:
entry["sweeps"].append(sweep_key)
entry["sweeps"].sort()
reg_path.write_text(json.dumps(reg, indent=2))
# ─────────────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--name", required=True,
help="Sweep folder under data/eval_sweep/, also used as the sweep key.")
ap.add_argument("--benchmark", required=True, choices=sorted(BENCHMARKS.keys()),
help="Which benchmark this sweep belongs to.")
ap.add_argument("--suite", required=True,
help="Path to the Harbor task suite (where task.toml/instruction.md/etc live).")
ap.add_argument("--sweep-dir", default=None,
help="Override the sweep dir (default data/eval_sweep/<name>).")
ap.add_argument("--jobs-dir", default=str(JOBS_DIR))
ap.add_argument("--k-max", type=int, default=4, dest="k_max")
ap.add_argument("--out", default=None,
help="Output dir (default viz_server/site/<name>).")
args = ap.parse_args()
sweep_dir = Path(args.sweep_dir) if args.sweep_dir else (SWEEP_BASE / args.name)
out = Path(args.out) if args.out else (SITE_DIR / args.name)
out.mkdir(parents=True, exist_ok=True)
suite = Path(args.suite).resolve()
# The app reads sweeps by their site/<dir>/ name, not by --name. Use that as
# the canonical sweep key so the registry stays consistent with the layout.
sweep_key = out.name
print(f"[build] benchmark={args.benchmark} sweep_key={sweep_key} (raw name={args.name})")
print(f" sweep_dir={sweep_dir}")
print(f" suite={suite}")
data = build_data(sweep_dir.resolve(), Path(args.jobs_dir).resolve(),
suite, args.k_max)
enrich_tasks(data, suite, args.benchmark)
summary = split(data, out)
(out / "summary.json").write_text(json.dumps(summary, separators=(",", ":"), default=str))
# Update site-wide benchmarks.json registry (key by site dir, not --name).
update_benchmarks_registry(out.parent, args.benchmark, sweep_key)
sz = (out / "summary.json").stat().st_size / 1024 / 1024
tz = (out / "traces.json").stat().st_size / 1024 / 1024
print(f" summary.json: {sz:.1f} MB")
print(f" traces.json: {tz:.1f} MB")
print(f" benchmarks.json updated β†’ {out.parent / 'benchmarks.json'}")
if __name__ == "__main__":
main()