Spaces:
Runtime error
Runtime error
Michael Rabinovich
eval_job: add shard mode (--fixtures/--shard-id) for sharded submission eval
b2e9d3a | """In-job entrypoint for the CADGenBench eval on HF Jobs. | |
| Invoked by the leaderboard Space's worker (see | |
| ``AI4Engineering/submit.py``) via:: | |
| hf jobs run --image hf.co/spaces/HuggingAI4Engineering/cadgenbench-eval-gpu \\ | |
| --flavor a10g-large \\ | |
| --env CADGENBENCH_DATA_REPO=HuggingAI4Engineering/cadgenbench-data \\ | |
| --env CADGENBENCH_DATA_GT_REPO=HuggingAI4Engineering/cadgenbench-data-gt \\ | |
| --env HF_SUBMISSIONS_REPO=HuggingAI4Engineering/cadgenbench-submissions \\ | |
| --env EVAL_WORKER_COUNT=8 \\ | |
| --secrets HF_TOKEN \\ | |
| python /opt/eval_job.py <submission_id> <zip_url> | |
| Two run modes: | |
| **Whole-submission (default, no ``--fixtures``)** -- the original path. | |
| Synchronous, no fallbacks. Any failure raises and the container exits | |
| non-zero; the Space's poller catches the ERROR stage and flips the | |
| submission row to ``failed``. | |
| 1. Download ``submissions/<id>.zip`` from the submissions dataset | |
| via ``hf_hub_download`` (auth via ``HF_TOKEN``). | |
| 2. Unpack into ``/tmp/run/``. | |
| 3. ``cadgenbench evaluate /tmp/run --workers <n>`` (subprocess). | |
| 4. ``cadgenbench report single /tmp/run -o /tmp/<id>.html`` | |
| (subprocess). | |
| 5. Build ``report.json`` bundling ``run_summary.json`` + every | |
| per-fixture ``result.json`` (mirror of submit.py's | |
| ``_build_report_json``). | |
| 6. Upload ``reports/<id>.html`` + ``reports/<id>.json`` back to the | |
| submissions dataset via ``HfApi.upload_file``. | |
| 7. Exit 0. | |
| The Space-side worker then downloads ``reports/<id>.json``, reads | |
| ``run_summary`` out of it, and flips the row to ``completed``. | |
| **Shard (``--fixtures f1,f2,... --shard-id shard_000``)** -- used by | |
| the Space's sharded submit path (UC3) to fan a large submission across | |
| several jobs. Steps 1-2 are identical, then the run dir is pruned to | |
| just this shard's fixtures, ``cadgenbench evaluate`` runs over that | |
| subset, and the resulting per-fixture dirs (``result.json`` + renders) | |
| are uploaded *verbatim* under ``reports/<id>/shards/<shard_id>/``. No | |
| report HTML, ``report.json``, or gallery render is produced per shard: | |
| the Space downloads every shard's fixture dirs, merges them into one | |
| run dir, and builds the single ``run_summary`` + report + gallery from | |
| the merged whole (mirroring the orchestrator's ``_merge_eval``). Exit 0 | |
| on success; any failure exits non-zero and the Space marks that shard | |
| ERROR and retries it. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import zipfile | |
| from pathlib import Path | |
| from typing import Any | |
| from huggingface_hub import HfApi, hf_hub_download | |
| RUN_DIR = Path("/tmp/run") | |
| REPORT_HTML_DIR = Path("/tmp") | |
| EVAL_TIMEOUT_SECONDS = 30 * 60 | |
| REPORT_TIMEOUT_SECONDS = 5 * 60 | |
| REPORTS_DIR_IN_REPO = "reports" | |
| RENDERS_DIR_IN_REPO = "renders" | |
| # Sub-prefix under ``reports/<id>/`` where each shard uploads its raw | |
| # per-fixture dirs in shard mode. The Space merges these and deletes the | |
| # whole ``shards/`` tree after a successful merge. | |
| SHARDS_DIR_NAME = "shards" | |
| # Single canonical view uploaded per fixture for the leaderboard | |
| # gallery thumbnail. "iso" matches the GT render the gallery pairs it | |
| # with, so the gallery columns stay a comparable matrix at one fixed | |
| # camera angle. The evaluator writes this at | |
| # ``<run_dir>/<fixture>/renders/iso.png`` (cadgenbench DEFAULT_VIEWS). | |
| GALLERY_THUMB_VIEW = "iso" | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Run the CADGenBench eval pipeline on an HF Job.", | |
| ) | |
| parser.add_argument( | |
| "submission_id", | |
| help="Filesystem-safe slug minted by the Space's submit handler.", | |
| ) | |
| parser.add_argument( | |
| "zip_url", | |
| help=( | |
| "Canonical Hub blob URL of submissions/<id>.zip " | |
| "(submission_blob_url from the row)." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--fixtures", | |
| default=None, | |
| help=( | |
| "Comma-separated fixture subset for shard mode. When set, the " | |
| "run dir is pruned to just these fixtures, evaluated, and the " | |
| "per-fixture dirs are uploaded under " | |
| "reports/<id>/shards/<shard-id>/ for the Space to merge. " | |
| "Omit for the original whole-submission path." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--shard-id", | |
| default=None, | |
| help=( | |
| "Shard label (e.g. shard_000) naming this shard's upload prefix. " | |
| "Required when --fixtures is set." | |
| ), | |
| ) | |
| args = parser.parse_args() | |
| submission_id: str = args.submission_id | |
| zip_url: str = args.zip_url | |
| token = _require_env("HF_TOKEN") | |
| submissions_repo = _require_env("HF_SUBMISSIONS_REPO") | |
| worker_count = int(os.environ.get("EVAL_WORKER_COUNT", "8")) | |
| shard_fixtures = _parse_fixtures_arg(args.fixtures) | |
| if shard_fixtures is not None: | |
| if not args.shard_id: | |
| raise RuntimeError("--shard-id is required when --fixtures is set.") | |
| print( | |
| f"[eval_job] submission_id={submission_id} shard={args.shard_id} " | |
| f"fixtures={len(shard_fixtures)} workers={worker_count} " | |
| f"repo={submissions_repo}", | |
| flush=True, | |
| ) | |
| _prepare_run_dir(submission_id, zip_url, submissions_repo, token) | |
| _prune_run_dir(RUN_DIR, shard_fixtures) | |
| _run_eval(RUN_DIR, worker_count) | |
| _upload_shard_artifacts( | |
| submission_id, args.shard_id, RUN_DIR, submissions_repo, token, | |
| ) | |
| print( | |
| f"[eval_job] done: {submission_id} shard={args.shard_id}", | |
| flush=True, | |
| ) | |
| return 0 | |
| print( | |
| f"[eval_job] submission_id={submission_id} " | |
| f"workers={worker_count} repo={submissions_repo}", | |
| flush=True, | |
| ) | |
| _prepare_run_dir(submission_id, zip_url, submissions_repo, token) | |
| _run_eval(RUN_DIR, worker_count) | |
| html_path = REPORT_HTML_DIR / f"{submission_id}.html" | |
| _run_report(RUN_DIR, html_path) | |
| report_json = _build_report_json(RUN_DIR) | |
| _upload_reports( | |
| submission_id, html_path, report_json, submissions_repo, token, | |
| ) | |
| _upload_gallery_renders(submission_id, RUN_DIR, submissions_repo, token) | |
| print(f"[eval_job] done: {submission_id}", flush=True) | |
| return 0 | |
| def _parse_fixtures_arg(raw: str | None) -> list[str] | None: | |
| """Parse the ``--fixtures`` CSV into a deduped list, or ``None``. | |
| ``None`` (flag absent) selects the whole-submission path. A present | |
| but empty/whitespace value is a usage error: a shard with no | |
| fixtures is never something the Space should dispatch. | |
| """ | |
| if raw is None: | |
| return None | |
| names: list[str] = [] | |
| seen: set[str] = set() | |
| for part in raw.split(","): | |
| name = part.strip() | |
| if not name or name in seen: | |
| continue | |
| seen.add(name) | |
| names.append(name) | |
| if not names: | |
| raise RuntimeError("--fixtures was set but resolved to no fixture names.") | |
| return names | |
| def _require_env(name: str) -> str: | |
| """Return env var *name* or raise with a clear message.""" | |
| value = os.environ.get(name) | |
| if not value: | |
| raise RuntimeError( | |
| f"Required environment variable {name!r} is unset or empty." | |
| ) | |
| return value | |
| def _prepare_run_dir( | |
| submission_id: str, | |
| zip_url: str, | |
| submissions_repo: str, | |
| token: str, | |
| ) -> None: | |
| """Download the submission zip and unpack into ``RUN_DIR``. | |
| Derives the in-repo path from *zip_url* and pulls via | |
| ``hf_hub_download`` so token auth is handled and the file lands | |
| in the Hub cache. *zip_url* is expected to look like | |
| ``https://huggingface.co/datasets/<repo>/resolve/main/submissions/<id>.zip``; | |
| we accept any URL shape that ends in ``submissions/<id>.zip`` and | |
| re-derive the in-repo filename from the *submission_id*. | |
| """ | |
| if RUN_DIR.exists(): | |
| shutil.rmtree(RUN_DIR) | |
| RUN_DIR.mkdir(parents=True) | |
| in_repo_path = f"submissions/{submission_id}.zip" | |
| print( | |
| f"[eval_job] downloading {submissions_repo}:{in_repo_path}", | |
| flush=True, | |
| ) | |
| local_zip = hf_hub_download( | |
| repo_id=submissions_repo, | |
| filename=in_repo_path, | |
| repo_type="dataset", | |
| token=token, | |
| ) | |
| # Defensive: matches the validated shape from submit.py's | |
| # _extract_zip, but the Space already gate-checked the zip | |
| # contents pre-upload so we extract directly without re- | |
| # validating zip-slip / symlinks here. | |
| with zipfile.ZipFile(local_zip) as zf: | |
| zf.extractall(RUN_DIR) | |
| print(f"[eval_job] unpacked into {RUN_DIR}", flush=True) | |
| def _prune_run_dir(run_dir: Path, fixtures: list[str]) -> None: | |
| """Drop every fixture dir under *run_dir* not in *fixtures*. | |
| Shard mode unpacks the whole zip (the candidate STEPs for every | |
| fixture) but should only evaluate this shard's slice, so we delete | |
| the other fixture dirs before ``cadgenbench evaluate`` walks the | |
| tree. Non-fixture files at the root (e.g. ``meta.json``) are left | |
| untouched. Raises if a requested fixture is absent from the zip, | |
| which would mean the Space sharded a name the submission didn't | |
| contain (a contract violation worth a loud, retried failure). | |
| """ | |
| wanted = set(fixtures) | |
| present = {p.name for p in run_dir.iterdir() if p.is_dir()} | |
| missing = wanted - present | |
| if missing: | |
| raise RuntimeError( | |
| f"Shard fixtures missing from submission zip: " | |
| f"{', '.join(sorted(missing))}" | |
| ) | |
| removed = 0 | |
| for child in run_dir.iterdir(): | |
| if child.is_dir() and child.name not in wanted: | |
| shutil.rmtree(child) | |
| removed += 1 | |
| print( | |
| f"[eval_job] pruned run dir to {len(wanted)} shard fixture(s) " | |
| f"(removed {removed})", | |
| flush=True, | |
| ) | |
| def _upload_shard_artifacts( | |
| submission_id: str, | |
| shard_id: str, | |
| run_dir: Path, | |
| submissions_repo: str, | |
| token: str, | |
| ) -> None: | |
| """Upload this shard's evaluated per-fixture dirs for the Space to merge. | |
| Pushes the pruned ``run_dir`` (each ``<fixture>/`` with its | |
| ``result.json`` + ``renders/`` + any overlay PNGs) verbatim to | |
| ``reports/<id>/shards/<shard_id>/`` in one commit. The Space | |
| downloads every shard's tree, copies the fixture dirs into a single | |
| merged run dir, and builds the aggregate ``run_summary`` + report + | |
| gallery from the whole. The per-shard ``run_summary.json`` written | |
| by ``cadgenbench evaluate`` rides along harmlessly; the merge | |
| recomputes it over the union and ignores the partials. | |
| """ | |
| api = HfApi(token=token) | |
| path_in_repo = f"{REPORTS_DIR_IN_REPO}/{submission_id}/{SHARDS_DIR_NAME}/{shard_id}" | |
| api.upload_folder( | |
| folder_path=str(run_dir), | |
| path_in_repo=path_in_repo, | |
| repo_id=submissions_repo, | |
| repo_type="dataset", | |
| commit_message=f"add eval shard {shard_id} for {submission_id}", | |
| ) | |
| print( | |
| f"[eval_job] uploaded shard {shard_id} -> {path_in_repo}", | |
| flush=True, | |
| ) | |
| def _run_eval(run_dir: Path, workers: int) -> None: | |
| """Invoke ``cadgenbench evaluate`` over *run_dir*; raise on non-zero.""" | |
| cmd = [ | |
| sys.executable, "-m", "cadgenbench.cli", "evaluate", str(run_dir), | |
| "--workers", str(workers), | |
| ] | |
| print(f"[eval_job] {' '.join(cmd)}", flush=True) | |
| proc = subprocess.run( | |
| cmd, | |
| timeout=EVAL_TIMEOUT_SECONDS, | |
| env=os.environ.copy(), | |
| check=False, | |
| ) | |
| if proc.returncode != 0: | |
| raise RuntimeError( | |
| f"cadgenbench evaluate exited {proc.returncode}" | |
| ) | |
| def _run_report(run_dir: Path, html_out: Path) -> None: | |
| """Invoke ``cadgenbench report single`` for *run_dir*; raise on non-zero.""" | |
| cmd = [ | |
| sys.executable, "-m", "cadgenbench.cli", "report", "single", | |
| str(run_dir), "-o", str(html_out), | |
| ] | |
| print(f"[eval_job] {' '.join(cmd)}", flush=True) | |
| proc = subprocess.run( | |
| cmd, | |
| timeout=REPORT_TIMEOUT_SECONDS, | |
| env=os.environ.copy(), | |
| check=False, | |
| ) | |
| if proc.returncode != 0 or not html_out.is_file(): | |
| raise RuntimeError( | |
| f"cadgenbench report single exited {proc.returncode} " | |
| f"(html exists={html_out.is_file()})" | |
| ) | |
| def _build_report_json(run_dir: Path) -> dict[str, Any]: | |
| """Bundle ``run_summary.json`` + every per-fixture ``result.json``. | |
| Identical shape to submit.py's ``_build_report_json``: the | |
| Space-side worker reads ``report.json`` after the Job completes | |
| and pulls ``run_summary`` out of it to flip the row. | |
| """ | |
| summary_path = run_dir / "run_summary.json" | |
| if not summary_path.is_file(): | |
| raise RuntimeError( | |
| f"run_summary.json not produced under {run_dir} (eval issue?)" | |
| ) | |
| summary = json.loads(summary_path.read_text(encoding="utf-8")) | |
| per_fixture: dict[str, dict[str, Any]] = {} | |
| for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): | |
| rp = fixture_dir / "result.json" | |
| if rp.is_file(): | |
| per_fixture[fixture_dir.name] = json.loads( | |
| rp.read_text(encoding="utf-8") | |
| ) | |
| return {"run_summary": summary, "per_fixture_results": per_fixture} | |
| def _upload_reports( | |
| submission_id: str, | |
| html_path: Path, | |
| report_json: dict[str, Any], | |
| submissions_repo: str, | |
| token: str, | |
| ) -> None: | |
| """Upload ``reports/<id>.html`` + ``reports/<id>.json`` to the Hub.""" | |
| api = HfApi(token=token) | |
| api.upload_file( | |
| path_or_fileobj=str(html_path), | |
| path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.html", | |
| repo_id=submissions_repo, | |
| repo_type="dataset", | |
| commit_message=f"add HTML report for {submission_id}", | |
| ) | |
| api.upload_file( | |
| path_or_fileobj=json.dumps( | |
| report_json, ensure_ascii=False, indent=2, | |
| ).encode("utf-8"), | |
| path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.json", | |
| repo_id=submissions_repo, | |
| repo_type="dataset", | |
| commit_message=f"add JSON report for {submission_id}", | |
| ) | |
| print( | |
| f"[eval_job] uploaded reports/{submission_id}.{{html,json}}", | |
| flush=True, | |
| ) | |
| def _upload_gallery_renders( | |
| submission_id: str, | |
| run_dir: Path, | |
| submissions_repo: str, | |
| token: str, | |
| ) -> None: | |
| """Upload one ``iso`` thumbnail per fixture for the leaderboard gallery. | |
| Stages every ``<run_dir>/<fixture>/renders/iso.png`` as | |
| ``renders/<id>/<fixture>.png`` in the submissions dataset (one | |
| commit). These are the standalone PNGs the gallery's | |
| ``renderFor()`` points at; the full multi-view renders stay | |
| base64-embedded in ``reports/<id>.html`` for the self-contained | |
| report. A fixture with no ``iso.png`` (missing output, or a render | |
| that never ran) is simply skipped, the gallery reads the | |
| per-fixture status from the row and draws the dashed "invalid | |
| generation" cell, so a thumbnail's absence is not an error. | |
| """ | |
| staged: list[tuple[Path, str]] = [] | |
| for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): | |
| iso_png = fixture_dir / "renders" / f"{GALLERY_THUMB_VIEW}.png" | |
| if iso_png.is_file(): | |
| staged.append((iso_png, fixture_dir.name)) | |
| if not staged: | |
| print( | |
| f"[eval_job] no gallery renders to upload for {submission_id}", | |
| flush=True, | |
| ) | |
| return | |
| api = HfApi(token=token) | |
| for iso_png, fixture_name in staged: | |
| api.upload_file( | |
| path_or_fileobj=str(iso_png), | |
| path_in_repo=( | |
| f"{RENDERS_DIR_IN_REPO}/{submission_id}/{fixture_name}.png" | |
| ), | |
| repo_id=submissions_repo, | |
| repo_type="dataset", | |
| commit_message=( | |
| f"add gallery render {fixture_name} for {submission_id}" | |
| ), | |
| ) | |
| print( | |
| f"[eval_job] uploaded {len(staged)} gallery render(s) under " | |
| f"{RENDERS_DIR_IN_REPO}/{submission_id}/", | |
| flush=True, | |
| ) | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |