"""In-job entrypoint for the CADGenBench eval on HF Jobs. Invoked by the leaderboard Space's worker (see ``AI4Engineering/submit.py``) via:: hf jobs run --image hf.co/spaces/HuggingAI4Engineering/cadgenbench-eval-gpu \\ --flavor a10g-large \\ --env CADGENBENCH_DATA_REPO=HuggingAI4Engineering/cadgenbench-data \\ --env CADGENBENCH_DATA_GT_REPO=HuggingAI4Engineering/cadgenbench-data-gt \\ --env HF_SUBMISSIONS_REPO=HuggingAI4Engineering/cadgenbench-submissions \\ --env EVAL_WORKER_COUNT=8 \\ --secrets HF_TOKEN \\ python /opt/eval_job.py Two run modes: **Whole-submission (default, no ``--fixtures``)** -- the original path. Synchronous, no fallbacks. Any failure raises and the container exits non-zero; the Space's poller catches the ERROR stage and flips the submission row to ``failed``. 1. Download ``submissions/.zip`` from the submissions dataset via ``hf_hub_download`` (auth via ``HF_TOKEN``). 2. Unpack into ``/tmp/run/``. 3. ``cadgenbench evaluate /tmp/run --workers `` (subprocess). 4. ``cadgenbench report single /tmp/run -o /tmp/.html`` (subprocess). 5. Build ``report.json`` bundling ``run_summary.json`` + every per-fixture ``result.json`` (mirror of submit.py's ``_build_report_json``). 6. Upload ``reports/.html`` + ``reports/.json`` back to the submissions dataset via ``HfApi.upload_file``. 7. Exit 0. The Space-side worker then downloads ``reports/.json``, reads ``run_summary`` out of it, and flips the row to ``completed``. **Shard (``--fixtures f1,f2,... --shard-id shard_000``)** -- used by the Space's sharded submit path (UC3) to fan a large submission across several jobs. Steps 1-2 are identical, then the run dir is pruned to just this shard's fixtures, ``cadgenbench evaluate`` runs over that subset, and the resulting per-fixture dirs (``result.json`` + renders) are staged *verbatim*. If ``CADGENBENCH_SHARD_BUCKET`` is set, the shard syncs them into that HF Storage Bucket via the bucket API; otherwise it uploads under ``reports//shards//`` in the submissions dataset. No report HTML, ``report.json``, or gallery render is produced per shard: the Space reads every shard's fixture dirs, merges them into one run dir, and builds the single ``run_summary`` + report + gallery from the merged whole (mirroring the orchestrator's ``_merge_eval``). Exit 0 on success; any failure exits non-zero and the Space marks that shard ERROR and retries it. """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import zipfile from pathlib import Path from typing import Any from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download RUN_DIR = Path("/tmp/run") REPORT_HTML_DIR = Path("/tmp") EVAL_TIMEOUT_SECONDS = 30 * 60 REPORT_TIMEOUT_SECONDS = 5 * 60 REPORTS_DIR_IN_REPO = "reports" RENDERS_DIR_IN_REPO = "renders" SHARD_BUCKET_ENV = "CADGENBENCH_SHARD_BUCKET" SHARD_BUCKET_PREFIX_ENV = "CADGENBENCH_SHARD_BUCKET_PREFIX" # Public HF Storage Bucket the eval job uploads gallery/report renders to (the # job is the sole render uploader; the Space never handles render bytes). The # hosted report + gallery reference these by anonymous bucket URL. Submission # renders only; GT renders stay in the private GT dataset. RENDER_BUCKET = os.environ.get( "CADGENBENCH_RENDER_BUCKET", "HuggingAI4Engineering/cadgenbench-eval-staging", ).strip() HF_ENDPOINT = os.environ.get("HF_ENDPOINT", "https://huggingface.co").rstrip("/") # Space-relative proxy roots the hosted report references for the *private* # assets it can't link from the public bucket. The report is served by the # Space at ``/reports/.html``, so these absolute-path URLs resolve against # the Space origin and the token-holding proxy streams the bytes. Kept in sync # with the routes registered in the leaderboard Space's ``app.py``. GT_PROXY_BASE_URL = "/gt" INPUT_PROXY_BASE_URL = "/task-input" def _render_base_url(submission_id: str) -> str: """Public ``.../resolve/renders/`` base; report appends ``//``.""" return f"{HF_ENDPOINT}/buckets/{RENDER_BUCKET}/resolve/{RENDERS_DIR_IN_REPO}/{submission_id}" def _submission_zip_url(submission_id: str, submissions_repo: str) -> str: """Hub resolve URL of ``submissions/.zip`` (the report's download link). Same canonical blob URL the submit handler records as ``submission_blob_url`` and the gallery links, so the report's download button points at the identical artifact. """ return ( f"{HF_ENDPOINT}/datasets/{submissions_repo}" f"/resolve/main/submissions/{submission_id}.zip" ) def _upload_renders_to_bucket( run_dir: Path, submission_id: str, token: str, ) -> list[str]: """Upload every fixture's renders to ``renders///`` in the bucket. One ``batch_bucket_files`` call for the whole submission (cheaper than a per-file fan-out). Returns the bucket object paths that were uploaded (so the caller can warm the CDN for them). """ add: list[tuple[str, str]] = [] for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): dest_prefix = ( f"{RENDERS_DIR_IN_REPO}/{submission_id}/{fixture_dir.name}" ) renders_dir = fixture_dir / "renders" if renders_dir.is_dir(): for render_path in sorted(renders_dir.iterdir()): if render_path.suffix.lower() not in {".png", ".webp"}: continue add.append((str(render_path), f"{dest_prefix}/{render_path.name}")) # The interface overlay is a per-fixture report artifact that lives at # the fixture-dir root (not under renders/). It must ride to the bucket # alongside the turntables so the hosted report can reference it by URL # instead of base64-inlining it; without this the report would have a # broken overlay link. Uploaded under the same per-fixture prefix. overlay = fixture_dir / "interface_overlay.png" if overlay.is_file(): add.append((str(overlay), f"{dest_prefix}/{overlay.name}")) if not add: return [] HfApi(token=token).batch_bucket_files(RENDER_BUCKET, add=add, token=token) print( f"[eval_job] uploaded {len(add)} render(s) -> " f"hf://buckets/{RENDER_BUCKET}/{RENDERS_DIR_IN_REPO}/{submission_id}", flush=True, ) return [dest for _, dest in add] def _warm_render_cdn(object_paths: list[str]) -> None: """Prime the CDN by fetching each freshly-uploaded render once. A bucket serves a render via a 302 to a signed Xet CDN URL, and the very first fetch of a brand-new object pays the chunk-reconstruction cost, which is the lag a viewer sees opening a just-published report. Fetching each object here (in parallel, anonymously, best-effort) warms the edge cache so the first human hits a warm object instead. Failures are swallowed: warming is an optimisation, never a publish blocker. """ import urllib.request from concurrent.futures import ThreadPoolExecutor def _warm(path: str) -> None: url = f"{HF_ENDPOINT}/buckets/{RENDER_BUCKET}/resolve/{path}" try: with urllib.request.urlopen(url, timeout=30) as resp: resp.read() except Exception: pass if not object_paths: return with ThreadPoolExecutor(max_workers=16) as pool: list(pool.map(_warm, object_paths)) print(f"[eval_job] warmed CDN for {len(object_paths)} render(s)", flush=True) # Sub-prefix under ``reports//`` where each shard uploads its raw # per-fixture dirs in shard mode. The Space merges these and deletes the # whole ``shards/`` tree after a successful merge. SHARDS_DIR_NAME = "shards" def main() -> int: parser = argparse.ArgumentParser( description="Run the CADGenBench eval pipeline on an HF Job.", ) parser.add_argument( "submission_id", help="Filesystem-safe slug minted by the Space's submit handler.", ) parser.add_argument( "zip_url", help=( "Canonical Hub blob URL of submissions/.zip " "(submission_blob_url from the row)." ), ) parser.add_argument( "--fixtures", default=None, help=( "Comma-separated fixture subset for shard mode. When set, the " "run dir is pruned to just these fixtures, evaluated, and the " "per-fixture dirs are uploaded under " "reports//shards// for the Space to merge. " "Omit for the original whole-submission path." ), ) parser.add_argument( "--shard-id", default=None, help=( "Shard label (e.g. shard_000) naming this shard's upload prefix. " "Required when --fixtures is set." ), ) args = parser.parse_args() submission_id: str = args.submission_id zip_url: str = args.zip_url token = _require_env("HF_TOKEN") submissions_repo = _require_env("HF_SUBMISSIONS_REPO") worker_count = int(os.environ.get("EVAL_WORKER_COUNT", "8")) shard_fixtures = _parse_fixtures_arg(args.fixtures) if shard_fixtures is not None: if not args.shard_id: raise RuntimeError("--shard-id is required when --fixtures is set.") print( f"[eval_job] submission_id={submission_id} shard={args.shard_id} " f"fixtures={len(shard_fixtures)} workers={worker_count} " f"repo={submissions_repo}", flush=True, ) _prepare_run_dir(submission_id, zip_url, submissions_repo, token) _prune_run_dir(RUN_DIR, shard_fixtures) _run_eval(RUN_DIR, worker_count) # The shard job is the sole uploader of its fixtures' renders to the # permanent bucket prefix; the Space merge only assembles the report. _warm_render_cdn(_upload_renders_to_bucket(RUN_DIR, submission_id, token)) _upload_shard_artifacts( submission_id, args.shard_id, RUN_DIR, submissions_repo, token, ) print( f"[eval_job] done: {submission_id} shard={args.shard_id}", flush=True, ) return 0 print( f"[eval_job] submission_id={submission_id} " f"workers={worker_count} repo={submissions_repo}", flush=True, ) _prepare_run_dir(submission_id, zip_url, submissions_repo, token) _run_eval(RUN_DIR, worker_count) # Upload renders to the public bucket and warm the CDN, then build the # report referencing them by URL (so the heavy WebP/PNG bytes never land in # the HTML and the first viewer hits an already-warm edge cache). _warm_render_cdn(_upload_renders_to_bucket(RUN_DIR, submission_id, token)) html_path = REPORT_HTML_DIR / f"{submission_id}.html" _run_report( RUN_DIR, html_path, render_base_url=_render_base_url(submission_id), download_url=_submission_zip_url(submission_id, submissions_repo), ) report_json = _build_report_json(RUN_DIR) _publish_reports_and_gallery( submission_id, html_path, report_json, submissions_repo, token, ) print(f"[eval_job] done: {submission_id}", flush=True) return 0 def _parse_fixtures_arg(raw: str | None) -> list[str] | None: """Parse the ``--fixtures`` CSV into a deduped list, or ``None``. ``None`` (flag absent) selects the whole-submission path. A present but empty/whitespace value is a usage error: a shard with no fixtures is never something the Space should dispatch. """ if raw is None: return None names: list[str] = [] seen: set[str] = set() for part in raw.split(","): name = part.strip() if not name or name in seen: continue seen.add(name) names.append(name) if not names: raise RuntimeError("--fixtures was set but resolved to no fixture names.") return names def _require_env(name: str) -> str: """Return env var *name* or raise with a clear message.""" value = os.environ.get(name) if not value: raise RuntimeError( f"Required environment variable {name!r} is unset or empty." ) return value def _prepare_run_dir( submission_id: str, zip_url: str, submissions_repo: str, token: str, ) -> None: """Download the submission zip and unpack into ``RUN_DIR``. Derives the in-repo path from *zip_url* and pulls via ``hf_hub_download`` so token auth is handled and the file lands in the Hub cache. *zip_url* is expected to look like ``https://huggingface.co/datasets//resolve/main/submissions/.zip``; we accept any URL shape that ends in ``submissions/.zip`` and re-derive the in-repo filename from the *submission_id*. """ if RUN_DIR.exists(): shutil.rmtree(RUN_DIR) RUN_DIR.mkdir(parents=True) in_repo_path = f"submissions/{submission_id}.zip" print( f"[eval_job] downloading {submissions_repo}:{in_repo_path}", flush=True, ) local_zip = hf_hub_download( repo_id=submissions_repo, filename=in_repo_path, repo_type="dataset", token=token, ) # Defensive: matches the validated shape from submit.py's # _extract_zip, but the Space already gate-checked the zip # contents pre-upload so we extract directly without re- # validating zip-slip / symlinks here. with zipfile.ZipFile(local_zip) as zf: zf.extractall(RUN_DIR) print(f"[eval_job] unpacked into {RUN_DIR}", flush=True) def _prune_run_dir(run_dir: Path, fixtures: list[str]) -> None: """Drop every fixture dir under *run_dir* not in *fixtures*. Shard mode unpacks the whole zip (the candidate STEPs for every fixture) but should only evaluate this shard's slice, so we delete the other fixture dirs before ``cadgenbench evaluate`` walks the tree. Non-fixture files at the root (e.g. ``meta.json``) are left untouched. Raises if a requested fixture is absent from the zip, which would mean the Space sharded a name the submission didn't contain (a contract violation worth a loud, retried failure). """ wanted = set(fixtures) present = {p.name for p in run_dir.iterdir() if p.is_dir()} missing = wanted - present if missing: raise RuntimeError( f"Shard fixtures missing from submission zip: " f"{', '.join(sorted(missing))}" ) removed = 0 for child in run_dir.iterdir(): if child.is_dir() and child.name not in wanted: shutil.rmtree(child) removed += 1 print( f"[eval_job] pruned run dir to {len(wanted)} shard fixture(s) " f"(removed {removed})", flush=True, ) def _upload_shard_artifacts( submission_id: str, shard_id: str, run_dir: Path, submissions_repo: str, token: str, ) -> None: """Upload this shard's evaluated per-fixture dirs for the Space to merge. Persists the pruned ``run_dir`` (each ``/`` with its ``result.json`` + ``renders/`` + any overlay PNGs) verbatim. In bucket mode, this syncs the dir into the HF Storage Bucket via the bucket API (no volume mount); in legacy mode, it is one dataset-repo commit under ``reports//shards//``. The Space reads every shard's tree, copies the fixture dirs into a single merged run dir, and builds the aggregate ``run_summary`` + report + gallery from the whole. The per-shard ``run_summary.json`` written by ``cadgenbench evaluate`` rides along harmlessly; the merge recomputes it over the union and ignores the partials. """ bucket_id = os.environ.get(SHARD_BUCKET_ENV, "").strip() if bucket_id: if bucket_id.startswith("hf://buckets/"): bucket_id = bucket_id[len("hf://buckets/"):] bucket_id = bucket_id.rstrip("/") prefix = os.environ.get(SHARD_BUCKET_PREFIX_ENV, "submissions").strip("/") dest = ( f"hf://buckets/{bucket_id}/{prefix}/{submission_id}/" f"{SHARDS_DIR_NAME}/{shard_id}" ) api = HfApi(token=token) api.sync_bucket(source=str(run_dir), dest=dest, token=token) print( f"[eval_job] synced shard {shard_id} -> {dest}", flush=True, ) return api = HfApi(token=token) path_in_repo = f"{REPORTS_DIR_IN_REPO}/{submission_id}/{SHARDS_DIR_NAME}/{shard_id}" api.upload_folder( folder_path=str(run_dir), path_in_repo=path_in_repo, repo_id=submissions_repo, repo_type="dataset", commit_message=f"add eval shard {shard_id} for {submission_id}", ) print( f"[eval_job] uploaded shard {shard_id} -> {path_in_repo}", flush=True, ) def _run_eval(run_dir: Path, workers: int) -> None: """Invoke ``cadgenbench evaluate`` over *run_dir*; raise on non-zero.""" cmd = [ sys.executable, "-m", "cadgenbench.cli", "evaluate", str(run_dir), "--workers", str(workers), ] print(f"[eval_job] {' '.join(cmd)}", flush=True) proc = subprocess.run( cmd, timeout=EVAL_TIMEOUT_SECONDS, env=os.environ.copy(), check=False, ) if proc.returncode != 0: raise RuntimeError( f"cadgenbench evaluate exited {proc.returncode}" ) def _run_report( run_dir: Path, html_out: Path, *, render_base_url: str | None = None, download_url: str | None = None, ) -> None: """Invoke ``cadgenbench report single`` for *run_dir*; raise on non-zero. Passes ``--render-base-url`` so candidate renders are referenced from the public bucket rather than base64-inlined into the hosted HTML. """ cmd = [ sys.executable, "-m", "cadgenbench.cli", "report", "single", str(run_dir), "-o", str(html_out), ] if render_base_url: cmd += [ "--render-base-url", render_base_url, # GT + inputs are private, so they link through the Space proxy # rather than the public bucket. Passed alongside the render base # so the whole hosted report is lazy-loaded links, not base64. "--gt-base-url", GT_PROXY_BASE_URL, "--input-base-url", INPUT_PROXY_BASE_URL, ] if download_url: cmd += ["--download-url", download_url] print(f"[eval_job] {' '.join(cmd)}", flush=True) proc = subprocess.run( cmd, timeout=REPORT_TIMEOUT_SECONDS, env=os.environ.copy(), check=False, ) if proc.returncode != 0 or not html_out.is_file(): raise RuntimeError( f"cadgenbench report single exited {proc.returncode} " f"(html exists={html_out.is_file()})" ) def _build_report_json(run_dir: Path) -> dict[str, Any]: """Bundle ``run_summary.json`` + every per-fixture ``result.json``. Identical shape to submit.py's ``_build_report_json``: the Space-side worker reads ``report.json`` after the Job completes and pulls ``run_summary`` out of it to flip the row. """ summary_path = run_dir / "run_summary.json" if not summary_path.is_file(): raise RuntimeError( f"run_summary.json not produced under {run_dir} (eval issue?)" ) summary = json.loads(summary_path.read_text(encoding="utf-8")) per_fixture: dict[str, dict[str, Any]] = {} for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): rp = fixture_dir / "result.json" if rp.is_file(): per_fixture[fixture_dir.name] = json.loads( rp.read_text(encoding="utf-8") ) return {"run_summary": summary, "per_fixture_results": per_fixture} def _publish_reports_and_gallery( submission_id: str, html_path: Path, report_json: dict[str, Any], submissions_repo: str, token: str, ) -> None: """Publish the report HTML + JSON to the submissions dataset in one commit. Renders are **not** committed here: :func:`_upload_renders_to_bucket` has already pushed them to the public render bucket under ``renders//``, and the report HTML references them by bucket URL. Keeping the binary renders out of the dataset repo avoids bloating its git history and the commit-queue 429s the per-file fan-out used to cause. """ operations: list[CommitOperationAdd] = [ CommitOperationAdd( path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.html", path_or_fileobj=str(html_path), ), CommitOperationAdd( path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.json", path_or_fileobj=json.dumps( report_json, ensure_ascii=False, indent=2, ).encode("utf-8"), ), ] api = HfApi(token=token) api.create_commit( repo_id=submissions_repo, repo_type="dataset", operations=operations, commit_message=f"publish report for {submission_id}", ) print( f"[eval_job] published reports/{submission_id}.{{html,json}}", flush=True, ) if __name__ == "__main__": sys.exit(main())