"""In-job entrypoint for the CADGenBench eval on HF Jobs. Invoked by the leaderboard Space's worker (see ``AI4Engineering/submit.py``) via:: hf jobs run --image hf.co/spaces/HuggingAI4Engineering/cadgenbench-eval-gpu \\ --flavor a10g-large \\ --env CADGENBENCH_DATA_REPO=HuggingAI4Engineering/cadgenbench-data \\ --env CADGENBENCH_DATA_GT_REPO=HuggingAI4Engineering/cadgenbench-data-gt \\ --env HF_SUBMISSIONS_REPO=HuggingAI4Engineering/cadgenbench-submissions \\ --env EVAL_WORKER_COUNT=8 \\ --secrets HF_TOKEN \\ python /opt/eval_job.py Two run modes: **Whole-submission (default, no ``--fixtures``)** -- the original path. Synchronous, no fallbacks. Any failure raises and the container exits non-zero; the Space's poller catches the ERROR stage and flips the submission row to ``failed``. 1. Download ``submissions/.zip`` from the submissions dataset via ``hf_hub_download`` (auth via ``HF_TOKEN``). 2. Unpack into ``/tmp/run/``. 3. ``cadgenbench evaluate /tmp/run --workers `` (subprocess). 4. ``cadgenbench report single /tmp/run -o /tmp/.html`` (subprocess). 5. Build ``report.json`` bundling ``run_summary.json`` + every per-fixture ``result.json`` (mirror of submit.py's ``_build_report_json``). 6. Upload ``reports/.html`` + ``reports/.json`` back to the submissions dataset via ``HfApi.upload_file``. 7. Exit 0. The Space-side worker then downloads ``reports/.json``, reads ``run_summary`` out of it, and flips the row to ``completed``. **Shard (``--fixtures f1,f2,... --shard-id shard_000``)** -- used by the Space's sharded submit path (UC3) to fan a large submission across several jobs. Steps 1-2 are identical, then the run dir is pruned to just this shard's fixtures, ``cadgenbench evaluate`` runs over that subset, and the resulting per-fixture dirs (``result.json`` + renders) are uploaded *verbatim* under ``reports//shards//``. No report HTML, ``report.json``, or gallery render is produced per shard: the Space downloads every shard's fixture dirs, merges them into one run dir, and builds the single ``run_summary`` + report + gallery from the merged whole (mirroring the orchestrator's ``_merge_eval``). Exit 0 on success; any failure exits non-zero and the Space marks that shard ERROR and retries it. """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import zipfile from pathlib import Path from typing import Any from huggingface_hub import HfApi, hf_hub_download RUN_DIR = Path("/tmp/run") REPORT_HTML_DIR = Path("/tmp") EVAL_TIMEOUT_SECONDS = 30 * 60 REPORT_TIMEOUT_SECONDS = 5 * 60 REPORTS_DIR_IN_REPO = "reports" RENDERS_DIR_IN_REPO = "renders" # Sub-prefix under ``reports//`` where each shard uploads its raw # per-fixture dirs in shard mode. The Space merges these and deletes the # whole ``shards/`` tree after a successful merge. SHARDS_DIR_NAME = "shards" # Single canonical view uploaded per fixture for the leaderboard # gallery thumbnail. "iso" matches the GT render the gallery pairs it # with, so the gallery columns stay a comparable matrix at one fixed # camera angle. The evaluator writes this at # ``//renders/iso.png`` (cadgenbench DEFAULT_VIEWS). GALLERY_THUMB_VIEW = "iso" def main() -> int: parser = argparse.ArgumentParser( description="Run the CADGenBench eval pipeline on an HF Job.", ) parser.add_argument( "submission_id", help="Filesystem-safe slug minted by the Space's submit handler.", ) parser.add_argument( "zip_url", help=( "Canonical Hub blob URL of submissions/.zip " "(submission_blob_url from the row)." ), ) parser.add_argument( "--fixtures", default=None, help=( "Comma-separated fixture subset for shard mode. When set, the " "run dir is pruned to just these fixtures, evaluated, and the " "per-fixture dirs are uploaded under " "reports//shards// for the Space to merge. " "Omit for the original whole-submission path." ), ) parser.add_argument( "--shard-id", default=None, help=( "Shard label (e.g. shard_000) naming this shard's upload prefix. " "Required when --fixtures is set." ), ) args = parser.parse_args() submission_id: str = args.submission_id zip_url: str = args.zip_url token = _require_env("HF_TOKEN") submissions_repo = _require_env("HF_SUBMISSIONS_REPO") worker_count = int(os.environ.get("EVAL_WORKER_COUNT", "8")) shard_fixtures = _parse_fixtures_arg(args.fixtures) if shard_fixtures is not None: if not args.shard_id: raise RuntimeError("--shard-id is required when --fixtures is set.") print( f"[eval_job] submission_id={submission_id} shard={args.shard_id} " f"fixtures={len(shard_fixtures)} workers={worker_count} " f"repo={submissions_repo}", flush=True, ) _prepare_run_dir(submission_id, zip_url, submissions_repo, token) _prune_run_dir(RUN_DIR, shard_fixtures) _run_eval(RUN_DIR, worker_count) _upload_shard_artifacts( submission_id, args.shard_id, RUN_DIR, submissions_repo, token, ) print( f"[eval_job] done: {submission_id} shard={args.shard_id}", flush=True, ) return 0 print( f"[eval_job] submission_id={submission_id} " f"workers={worker_count} repo={submissions_repo}", flush=True, ) _prepare_run_dir(submission_id, zip_url, submissions_repo, token) _run_eval(RUN_DIR, worker_count) html_path = REPORT_HTML_DIR / f"{submission_id}.html" _run_report(RUN_DIR, html_path) report_json = _build_report_json(RUN_DIR) _upload_reports( submission_id, html_path, report_json, submissions_repo, token, ) _upload_gallery_renders(submission_id, RUN_DIR, submissions_repo, token) print(f"[eval_job] done: {submission_id}", flush=True) return 0 def _parse_fixtures_arg(raw: str | None) -> list[str] | None: """Parse the ``--fixtures`` CSV into a deduped list, or ``None``. ``None`` (flag absent) selects the whole-submission path. A present but empty/whitespace value is a usage error: a shard with no fixtures is never something the Space should dispatch. """ if raw is None: return None names: list[str] = [] seen: set[str] = set() for part in raw.split(","): name = part.strip() if not name or name in seen: continue seen.add(name) names.append(name) if not names: raise RuntimeError("--fixtures was set but resolved to no fixture names.") return names def _require_env(name: str) -> str: """Return env var *name* or raise with a clear message.""" value = os.environ.get(name) if not value: raise RuntimeError( f"Required environment variable {name!r} is unset or empty." ) return value def _prepare_run_dir( submission_id: str, zip_url: str, submissions_repo: str, token: str, ) -> None: """Download the submission zip and unpack into ``RUN_DIR``. Derives the in-repo path from *zip_url* and pulls via ``hf_hub_download`` so token auth is handled and the file lands in the Hub cache. *zip_url* is expected to look like ``https://huggingface.co/datasets//resolve/main/submissions/.zip``; we accept any URL shape that ends in ``submissions/.zip`` and re-derive the in-repo filename from the *submission_id*. """ if RUN_DIR.exists(): shutil.rmtree(RUN_DIR) RUN_DIR.mkdir(parents=True) in_repo_path = f"submissions/{submission_id}.zip" print( f"[eval_job] downloading {submissions_repo}:{in_repo_path}", flush=True, ) local_zip = hf_hub_download( repo_id=submissions_repo, filename=in_repo_path, repo_type="dataset", token=token, ) # Defensive: matches the validated shape from submit.py's # _extract_zip, but the Space already gate-checked the zip # contents pre-upload so we extract directly without re- # validating zip-slip / symlinks here. with zipfile.ZipFile(local_zip) as zf: zf.extractall(RUN_DIR) print(f"[eval_job] unpacked into {RUN_DIR}", flush=True) def _prune_run_dir(run_dir: Path, fixtures: list[str]) -> None: """Drop every fixture dir under *run_dir* not in *fixtures*. Shard mode unpacks the whole zip (the candidate STEPs for every fixture) but should only evaluate this shard's slice, so we delete the other fixture dirs before ``cadgenbench evaluate`` walks the tree. Non-fixture files at the root (e.g. ``meta.json``) are left untouched. Raises if a requested fixture is absent from the zip, which would mean the Space sharded a name the submission didn't contain (a contract violation worth a loud, retried failure). """ wanted = set(fixtures) present = {p.name for p in run_dir.iterdir() if p.is_dir()} missing = wanted - present if missing: raise RuntimeError( f"Shard fixtures missing from submission zip: " f"{', '.join(sorted(missing))}" ) removed = 0 for child in run_dir.iterdir(): if child.is_dir() and child.name not in wanted: shutil.rmtree(child) removed += 1 print( f"[eval_job] pruned run dir to {len(wanted)} shard fixture(s) " f"(removed {removed})", flush=True, ) def _upload_shard_artifacts( submission_id: str, shard_id: str, run_dir: Path, submissions_repo: str, token: str, ) -> None: """Upload this shard's evaluated per-fixture dirs for the Space to merge. Pushes the pruned ``run_dir`` (each ``/`` with its ``result.json`` + ``renders/`` + any overlay PNGs) verbatim to ``reports//shards//`` in one commit. The Space downloads every shard's tree, copies the fixture dirs into a single merged run dir, and builds the aggregate ``run_summary`` + report + gallery from the whole. The per-shard ``run_summary.json`` written by ``cadgenbench evaluate`` rides along harmlessly; the merge recomputes it over the union and ignores the partials. """ api = HfApi(token=token) path_in_repo = f"{REPORTS_DIR_IN_REPO}/{submission_id}/{SHARDS_DIR_NAME}/{shard_id}" api.upload_folder( folder_path=str(run_dir), path_in_repo=path_in_repo, repo_id=submissions_repo, repo_type="dataset", commit_message=f"add eval shard {shard_id} for {submission_id}", ) print( f"[eval_job] uploaded shard {shard_id} -> {path_in_repo}", flush=True, ) def _run_eval(run_dir: Path, workers: int) -> None: """Invoke ``cadgenbench evaluate`` over *run_dir*; raise on non-zero.""" cmd = [ sys.executable, "-m", "cadgenbench.cli", "evaluate", str(run_dir), "--workers", str(workers), ] print(f"[eval_job] {' '.join(cmd)}", flush=True) proc = subprocess.run( cmd, timeout=EVAL_TIMEOUT_SECONDS, env=os.environ.copy(), check=False, ) if proc.returncode != 0: raise RuntimeError( f"cadgenbench evaluate exited {proc.returncode}" ) def _run_report(run_dir: Path, html_out: Path) -> None: """Invoke ``cadgenbench report single`` for *run_dir*; raise on non-zero.""" cmd = [ sys.executable, "-m", "cadgenbench.cli", "report", "single", str(run_dir), "-o", str(html_out), ] print(f"[eval_job] {' '.join(cmd)}", flush=True) proc = subprocess.run( cmd, timeout=REPORT_TIMEOUT_SECONDS, env=os.environ.copy(), check=False, ) if proc.returncode != 0 or not html_out.is_file(): raise RuntimeError( f"cadgenbench report single exited {proc.returncode} " f"(html exists={html_out.is_file()})" ) def _build_report_json(run_dir: Path) -> dict[str, Any]: """Bundle ``run_summary.json`` + every per-fixture ``result.json``. Identical shape to submit.py's ``_build_report_json``: the Space-side worker reads ``report.json`` after the Job completes and pulls ``run_summary`` out of it to flip the row. """ summary_path = run_dir / "run_summary.json" if not summary_path.is_file(): raise RuntimeError( f"run_summary.json not produced under {run_dir} (eval issue?)" ) summary = json.loads(summary_path.read_text(encoding="utf-8")) per_fixture: dict[str, dict[str, Any]] = {} for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): rp = fixture_dir / "result.json" if rp.is_file(): per_fixture[fixture_dir.name] = json.loads( rp.read_text(encoding="utf-8") ) return {"run_summary": summary, "per_fixture_results": per_fixture} def _upload_reports( submission_id: str, html_path: Path, report_json: dict[str, Any], submissions_repo: str, token: str, ) -> None: """Upload ``reports/.html`` + ``reports/.json`` to the Hub.""" api = HfApi(token=token) api.upload_file( path_or_fileobj=str(html_path), path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.html", repo_id=submissions_repo, repo_type="dataset", commit_message=f"add HTML report for {submission_id}", ) api.upload_file( path_or_fileobj=json.dumps( report_json, ensure_ascii=False, indent=2, ).encode("utf-8"), path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.json", repo_id=submissions_repo, repo_type="dataset", commit_message=f"add JSON report for {submission_id}", ) print( f"[eval_job] uploaded reports/{submission_id}.{{html,json}}", flush=True, ) def _upload_gallery_renders( submission_id: str, run_dir: Path, submissions_repo: str, token: str, ) -> None: """Upload one ``iso`` thumbnail per fixture for the leaderboard gallery. Stages every ``//renders/iso.png`` as ``renders//.png`` in the submissions dataset (one commit). These are the standalone PNGs the gallery's ``renderFor()`` points at; the full multi-view renders stay base64-embedded in ``reports/.html`` for the self-contained report. A fixture with no ``iso.png`` (missing output, or a render that never ran) is simply skipped, the gallery reads the per-fixture status from the row and draws the dashed "invalid generation" cell, so a thumbnail's absence is not an error. """ staged: list[tuple[Path, str]] = [] for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): iso_png = fixture_dir / "renders" / f"{GALLERY_THUMB_VIEW}.png" if iso_png.is_file(): staged.append((iso_png, fixture_dir.name)) if not staged: print( f"[eval_job] no gallery renders to upload for {submission_id}", flush=True, ) return api = HfApi(token=token) for iso_png, fixture_name in staged: api.upload_file( path_or_fileobj=str(iso_png), path_in_repo=( f"{RENDERS_DIR_IN_REPO}/{submission_id}/{fixture_name}.png" ), repo_id=submissions_repo, repo_type="dataset", commit_message=( f"add gallery render {fixture_name} for {submission_id}" ), ) print( f"[eval_job] uploaded {len(staged)} gallery render(s) under " f"{RENDERS_DIR_IN_REPO}/{submission_id}/", flush=True, ) if __name__ == "__main__": sys.exit(main())