| """In-job entrypoint for the CADGenBench eval on HF Jobs. |
| |
| Invoked by the leaderboard Space's worker (see |
| ``AI4Engineering/submit.py``) via:: |
| |
| hf jobs run --image hf.co/spaces/HuggingAI4Engineering/cadgenbench-eval-gpu \\ |
| --flavor a10g-large \\ |
| --env CADGENBENCH_DATA_REPO=HuggingAI4Engineering/cadgenbench-data \\ |
| --env CADGENBENCH_DATA_GT_REPO=HuggingAI4Engineering/cadgenbench-data-gt \\ |
| --env HF_SUBMISSIONS_REPO=HuggingAI4Engineering/cadgenbench-submissions \\ |
| --env EVAL_WORKER_COUNT=8 \\ |
| --secrets HF_TOKEN \\ |
| python /opt/eval_job.py <submission_id> <zip_url> |
| |
| Two run modes: |
| |
| **Whole-submission (default, no ``--fixtures``)** -- the original path. |
| Synchronous, no fallbacks. Any failure raises and the container exits |
| non-zero; the Space's poller catches the ERROR stage and flips the |
| submission row to ``failed``. |
| |
| 1. Download ``submissions/<id>.zip`` from the submissions dataset |
| via ``hf_hub_download`` (auth via ``HF_TOKEN``). |
| 2. Unpack into ``/tmp/run/``. |
| 3. ``cadgenbench evaluate /tmp/run --workers <n>`` (subprocess). |
| 4. ``cadgenbench report single /tmp/run -o /tmp/<id>.html`` |
| (subprocess). |
| 5. Build ``report.json`` bundling ``run_summary.json`` + every |
| per-fixture ``result.json`` (mirror of submit.py's |
| ``_build_report_json``). |
| 6. Upload ``reports/<id>.html`` + ``reports/<id>.json`` back to the |
| submissions dataset via ``HfApi.upload_file``. |
| 7. Exit 0. |
| |
| The Space-side worker then downloads ``reports/<id>.json``, reads |
| ``run_summary`` out of it, and flips the row to ``completed``. |
| |
| **Shard (``--fixtures f1,f2,... --shard-id shard_000``)** -- used by |
| the Space's sharded submit path (UC3) to fan a large submission across |
| several jobs. Steps 1-2 are identical, then the run dir is pruned to |
| just this shard's fixtures, ``cadgenbench evaluate`` runs over that |
| subset, and the resulting per-fixture dirs (``result.json`` + renders) |
| are staged *verbatim*. If ``CADGENBENCH_SHARD_BUCKET`` is set, the shard |
| syncs them into that HF Storage Bucket via the bucket API; otherwise it |
| uploads under ``reports/<id>/shards/<shard_id>/`` in the submissions |
| dataset. No |
| report HTML, ``report.json``, or gallery render is produced per shard: |
| the Space reads every shard's fixture dirs, merges them into one run dir, |
| and builds the single ``run_summary`` + report + gallery from the merged |
| whole (mirroring the orchestrator's ``_merge_eval``). Exit 0 on success; |
| any failure exits non-zero and the Space marks that shard ERROR and |
| retries it. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import zipfile |
| from pathlib import Path |
| from typing import Any |
|
|
| from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download |
|
|
|
|
| RUN_DIR = Path("/tmp/run") |
| REPORT_HTML_DIR = Path("/tmp") |
|
|
| EVAL_TIMEOUT_SECONDS = 30 * 60 |
| REPORT_TIMEOUT_SECONDS = 5 * 60 |
|
|
| REPORTS_DIR_IN_REPO = "reports" |
| RENDERS_DIR_IN_REPO = "renders" |
| SHARD_BUCKET_ENV = "CADGENBENCH_SHARD_BUCKET" |
| SHARD_BUCKET_PREFIX_ENV = "CADGENBENCH_SHARD_BUCKET_PREFIX" |
|
|
| |
| |
| |
| |
| RENDER_BUCKET = os.environ.get( |
| "CADGENBENCH_RENDER_BUCKET", "HuggingAI4Engineering/cadgenbench-eval-staging", |
| ).strip() |
| HF_ENDPOINT = os.environ.get("HF_ENDPOINT", "https://huggingface.co").rstrip("/") |
|
|
| |
| |
| |
| |
| |
| GT_PROXY_BASE_URL = "/gt" |
| INPUT_PROXY_BASE_URL = "/task-input" |
|
|
|
|
| def _render_base_url(submission_id: str) -> str: |
| """Public ``.../resolve/renders/<id>`` base; report appends ``/<fixture>/<file>``.""" |
| return f"{HF_ENDPOINT}/buckets/{RENDER_BUCKET}/resolve/{RENDERS_DIR_IN_REPO}/{submission_id}" |
|
|
|
|
| def _submission_zip_url(submission_id: str, submissions_repo: str) -> str: |
| """Hub resolve URL of ``submissions/<id>.zip`` (the report's download link). |
| |
| Same canonical blob URL the submit handler records as |
| ``submission_blob_url`` and the gallery links, so the report's download |
| button points at the identical artifact. |
| """ |
| return ( |
| f"{HF_ENDPOINT}/datasets/{submissions_repo}" |
| f"/resolve/main/submissions/{submission_id}.zip" |
| ) |
|
|
|
|
| def _upload_renders_to_bucket( |
| run_dir: Path, submission_id: str, token: str, |
| ) -> list[str]: |
| """Upload every fixture's renders to ``renders/<id>/<fixture>/`` in the bucket. |
| |
| One ``batch_bucket_files`` call for the whole submission (cheaper than a |
| per-file fan-out). Returns the bucket object paths that were uploaded (so |
| the caller can warm the CDN for them). |
| """ |
| add: list[tuple[str, str]] = [] |
| for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): |
| dest_prefix = ( |
| f"{RENDERS_DIR_IN_REPO}/{submission_id}/{fixture_dir.name}" |
| ) |
| renders_dir = fixture_dir / "renders" |
| if renders_dir.is_dir(): |
| for render_path in sorted(renders_dir.iterdir()): |
| if render_path.suffix.lower() not in {".png", ".webp"}: |
| continue |
| add.append((str(render_path), f"{dest_prefix}/{render_path.name}")) |
| |
| |
| |
| |
| |
| overlay = fixture_dir / "interface_overlay.png" |
| if overlay.is_file(): |
| add.append((str(overlay), f"{dest_prefix}/{overlay.name}")) |
| if not add: |
| return [] |
| HfApi(token=token).batch_bucket_files(RENDER_BUCKET, add=add, token=token) |
| print( |
| f"[eval_job] uploaded {len(add)} render(s) -> " |
| f"hf://buckets/{RENDER_BUCKET}/{RENDERS_DIR_IN_REPO}/{submission_id}", |
| flush=True, |
| ) |
| return [dest for _, dest in add] |
|
|
|
|
| def _warm_render_cdn(object_paths: list[str]) -> None: |
| """Prime the CDN by fetching each freshly-uploaded render once. |
| |
| A bucket serves a render via a 302 to a signed Xet CDN URL, and the very |
| first fetch of a brand-new object pays the chunk-reconstruction cost, which |
| is the lag a viewer sees opening a just-published report. Fetching each |
| object here (in parallel, anonymously, best-effort) warms the edge cache so |
| the first human hits a warm object instead. Failures are swallowed: warming |
| is an optimisation, never a publish blocker. |
| """ |
| import urllib.request |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| def _warm(path: str) -> None: |
| url = f"{HF_ENDPOINT}/buckets/{RENDER_BUCKET}/resolve/{path}" |
| try: |
| with urllib.request.urlopen(url, timeout=30) as resp: |
| resp.read() |
| except Exception: |
| pass |
|
|
| if not object_paths: |
| return |
| with ThreadPoolExecutor(max_workers=16) as pool: |
| list(pool.map(_warm, object_paths)) |
| print(f"[eval_job] warmed CDN for {len(object_paths)} render(s)", flush=True) |
|
|
| |
| |
| |
| SHARDS_DIR_NAME = "shards" |
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser( |
| description="Run the CADGenBench eval pipeline on an HF Job.", |
| ) |
| parser.add_argument( |
| "submission_id", |
| help="Filesystem-safe slug minted by the Space's submit handler.", |
| ) |
| parser.add_argument( |
| "zip_url", |
| help=( |
| "Canonical Hub blob URL of submissions/<id>.zip " |
| "(submission_blob_url from the row)." |
| ), |
| ) |
| parser.add_argument( |
| "--fixtures", |
| default=None, |
| help=( |
| "Comma-separated fixture subset for shard mode. When set, the " |
| "run dir is pruned to just these fixtures, evaluated, and the " |
| "per-fixture dirs are uploaded under " |
| "reports/<id>/shards/<shard-id>/ for the Space to merge. " |
| "Omit for the original whole-submission path." |
| ), |
| ) |
| parser.add_argument( |
| "--shard-id", |
| default=None, |
| help=( |
| "Shard label (e.g. shard_000) naming this shard's upload prefix. " |
| "Required when --fixtures is set." |
| ), |
| ) |
| args = parser.parse_args() |
|
|
| submission_id: str = args.submission_id |
| zip_url: str = args.zip_url |
|
|
| token = _require_env("HF_TOKEN") |
| submissions_repo = _require_env("HF_SUBMISSIONS_REPO") |
| worker_count = int(os.environ.get("EVAL_WORKER_COUNT", "8")) |
|
|
| shard_fixtures = _parse_fixtures_arg(args.fixtures) |
| if shard_fixtures is not None: |
| if not args.shard_id: |
| raise RuntimeError("--shard-id is required when --fixtures is set.") |
| print( |
| f"[eval_job] submission_id={submission_id} shard={args.shard_id} " |
| f"fixtures={len(shard_fixtures)} workers={worker_count} " |
| f"repo={submissions_repo}", |
| flush=True, |
| ) |
| _prepare_run_dir(submission_id, zip_url, submissions_repo, token) |
| _prune_run_dir(RUN_DIR, shard_fixtures) |
| _run_eval(RUN_DIR, worker_count) |
| |
| |
| _warm_render_cdn(_upload_renders_to_bucket(RUN_DIR, submission_id, token)) |
| _upload_shard_artifacts( |
| submission_id, args.shard_id, RUN_DIR, submissions_repo, token, |
| ) |
| print( |
| f"[eval_job] done: {submission_id} shard={args.shard_id}", |
| flush=True, |
| ) |
| return 0 |
|
|
| print( |
| f"[eval_job] submission_id={submission_id} " |
| f"workers={worker_count} repo={submissions_repo}", |
| flush=True, |
| ) |
|
|
| _prepare_run_dir(submission_id, zip_url, submissions_repo, token) |
| _run_eval(RUN_DIR, worker_count) |
| |
| |
| |
| _warm_render_cdn(_upload_renders_to_bucket(RUN_DIR, submission_id, token)) |
| html_path = REPORT_HTML_DIR / f"{submission_id}.html" |
| _run_report( |
| RUN_DIR, html_path, |
| render_base_url=_render_base_url(submission_id), |
| download_url=_submission_zip_url(submission_id, submissions_repo), |
| ) |
| report_json = _build_report_json(RUN_DIR) |
| _publish_reports_and_gallery( |
| submission_id, html_path, report_json, submissions_repo, token, |
| ) |
| print(f"[eval_job] done: {submission_id}", flush=True) |
| return 0 |
|
|
|
|
| def _parse_fixtures_arg(raw: str | None) -> list[str] | None: |
| """Parse the ``--fixtures`` CSV into a deduped list, or ``None``. |
| |
| ``None`` (flag absent) selects the whole-submission path. A present |
| but empty/whitespace value is a usage error: a shard with no |
| fixtures is never something the Space should dispatch. |
| """ |
| if raw is None: |
| return None |
| names: list[str] = [] |
| seen: set[str] = set() |
| for part in raw.split(","): |
| name = part.strip() |
| if not name or name in seen: |
| continue |
| seen.add(name) |
| names.append(name) |
| if not names: |
| raise RuntimeError("--fixtures was set but resolved to no fixture names.") |
| return names |
|
|
|
|
| def _require_env(name: str) -> str: |
| """Return env var *name* or raise with a clear message.""" |
| value = os.environ.get(name) |
| if not value: |
| raise RuntimeError( |
| f"Required environment variable {name!r} is unset or empty." |
| ) |
| return value |
|
|
|
|
| def _prepare_run_dir( |
| submission_id: str, |
| zip_url: str, |
| submissions_repo: str, |
| token: str, |
| ) -> None: |
| """Download the submission zip and unpack into ``RUN_DIR``. |
| |
| Derives the in-repo path from *zip_url* and pulls via |
| ``hf_hub_download`` so token auth is handled and the file lands |
| in the Hub cache. *zip_url* is expected to look like |
| ``https://huggingface.co/datasets/<repo>/resolve/main/submissions/<id>.zip``; |
| we accept any URL shape that ends in ``submissions/<id>.zip`` and |
| re-derive the in-repo filename from the *submission_id*. |
| """ |
| if RUN_DIR.exists(): |
| shutil.rmtree(RUN_DIR) |
| RUN_DIR.mkdir(parents=True) |
|
|
| in_repo_path = f"submissions/{submission_id}.zip" |
| print( |
| f"[eval_job] downloading {submissions_repo}:{in_repo_path}", |
| flush=True, |
| ) |
| local_zip = hf_hub_download( |
| repo_id=submissions_repo, |
| filename=in_repo_path, |
| repo_type="dataset", |
| token=token, |
| ) |
|
|
| |
| |
| |
| |
| with zipfile.ZipFile(local_zip) as zf: |
| zf.extractall(RUN_DIR) |
| print(f"[eval_job] unpacked into {RUN_DIR}", flush=True) |
|
|
|
|
| def _prune_run_dir(run_dir: Path, fixtures: list[str]) -> None: |
| """Drop every fixture dir under *run_dir* not in *fixtures*. |
| |
| Shard mode unpacks the whole zip (the candidate STEPs for every |
| fixture) but should only evaluate this shard's slice, so we delete |
| the other fixture dirs before ``cadgenbench evaluate`` walks the |
| tree. Non-fixture files at the root (e.g. ``meta.json``) are left |
| untouched. Raises if a requested fixture is absent from the zip, |
| which would mean the Space sharded a name the submission didn't |
| contain (a contract violation worth a loud, retried failure). |
| """ |
| wanted = set(fixtures) |
| present = {p.name for p in run_dir.iterdir() if p.is_dir()} |
| missing = wanted - present |
| if missing: |
| raise RuntimeError( |
| f"Shard fixtures missing from submission zip: " |
| f"{', '.join(sorted(missing))}" |
| ) |
| removed = 0 |
| for child in run_dir.iterdir(): |
| if child.is_dir() and child.name not in wanted: |
| shutil.rmtree(child) |
| removed += 1 |
| print( |
| f"[eval_job] pruned run dir to {len(wanted)} shard fixture(s) " |
| f"(removed {removed})", |
| flush=True, |
| ) |
|
|
|
|
| def _upload_shard_artifacts( |
| submission_id: str, |
| shard_id: str, |
| run_dir: Path, |
| submissions_repo: str, |
| token: str, |
| ) -> None: |
| """Upload this shard's evaluated per-fixture dirs for the Space to merge. |
| |
| Persists the pruned ``run_dir`` (each ``<fixture>/`` with its |
| ``result.json`` + ``renders/`` + any overlay PNGs) verbatim. In |
| bucket mode, this syncs the dir into the HF Storage Bucket via the |
| bucket API (no volume mount); in legacy mode, it is one dataset-repo |
| commit under ``reports/<id>/shards/<shard_id>/``. The Space reads |
| every shard's tree, copies the fixture dirs into a single merged run |
| dir, and builds the aggregate ``run_summary`` + report + gallery from |
| the whole. The per-shard ``run_summary.json`` written by |
| ``cadgenbench evaluate`` rides along harmlessly; the merge recomputes |
| it over the union and ignores the partials. |
| """ |
| bucket_id = os.environ.get(SHARD_BUCKET_ENV, "").strip() |
| if bucket_id: |
| if bucket_id.startswith("hf://buckets/"): |
| bucket_id = bucket_id[len("hf://buckets/"):] |
| bucket_id = bucket_id.rstrip("/") |
| prefix = os.environ.get(SHARD_BUCKET_PREFIX_ENV, "submissions").strip("/") |
| dest = ( |
| f"hf://buckets/{bucket_id}/{prefix}/{submission_id}/" |
| f"{SHARDS_DIR_NAME}/{shard_id}" |
| ) |
| api = HfApi(token=token) |
| api.sync_bucket(source=str(run_dir), dest=dest, token=token) |
| print( |
| f"[eval_job] synced shard {shard_id} -> {dest}", |
| flush=True, |
| ) |
| return |
|
|
| api = HfApi(token=token) |
| path_in_repo = f"{REPORTS_DIR_IN_REPO}/{submission_id}/{SHARDS_DIR_NAME}/{shard_id}" |
| api.upload_folder( |
| folder_path=str(run_dir), |
| path_in_repo=path_in_repo, |
| repo_id=submissions_repo, |
| repo_type="dataset", |
| commit_message=f"add eval shard {shard_id} for {submission_id}", |
| ) |
| print( |
| f"[eval_job] uploaded shard {shard_id} -> {path_in_repo}", |
| flush=True, |
| ) |
|
|
|
|
| def _run_eval(run_dir: Path, workers: int) -> None: |
| """Invoke ``cadgenbench evaluate`` over *run_dir*; raise on non-zero.""" |
| cmd = [ |
| sys.executable, "-m", "cadgenbench.cli", "evaluate", str(run_dir), |
| "--workers", str(workers), |
| ] |
| print(f"[eval_job] {' '.join(cmd)}", flush=True) |
| proc = subprocess.run( |
| cmd, |
| timeout=EVAL_TIMEOUT_SECONDS, |
| env=os.environ.copy(), |
| check=False, |
| ) |
| if proc.returncode != 0: |
| raise RuntimeError( |
| f"cadgenbench evaluate exited {proc.returncode}" |
| ) |
|
|
|
|
| def _run_report( |
| run_dir: Path, |
| html_out: Path, |
| *, |
| render_base_url: str | None = None, |
| download_url: str | None = None, |
| ) -> None: |
| """Invoke ``cadgenbench report single`` for *run_dir*; raise on non-zero. |
| |
| Passes ``--render-base-url`` so candidate renders are referenced from the |
| public bucket rather than base64-inlined into the hosted HTML. |
| """ |
| cmd = [ |
| sys.executable, "-m", "cadgenbench.cli", "report", "single", |
| str(run_dir), "-o", str(html_out), |
| ] |
| if render_base_url: |
| cmd += [ |
| "--render-base-url", render_base_url, |
| |
| |
| |
| "--gt-base-url", GT_PROXY_BASE_URL, |
| "--input-base-url", INPUT_PROXY_BASE_URL, |
| ] |
| if download_url: |
| cmd += ["--download-url", download_url] |
| print(f"[eval_job] {' '.join(cmd)}", flush=True) |
| proc = subprocess.run( |
| cmd, |
| timeout=REPORT_TIMEOUT_SECONDS, |
| env=os.environ.copy(), |
| check=False, |
| ) |
| if proc.returncode != 0 or not html_out.is_file(): |
| raise RuntimeError( |
| f"cadgenbench report single exited {proc.returncode} " |
| f"(html exists={html_out.is_file()})" |
| ) |
|
|
|
|
| def _build_report_json(run_dir: Path) -> dict[str, Any]: |
| """Bundle ``run_summary.json`` + every per-fixture ``result.json``. |
| |
| Identical shape to submit.py's ``_build_report_json``: the |
| Space-side worker reads ``report.json`` after the Job completes |
| and pulls ``run_summary`` out of it to flip the row. |
| """ |
| summary_path = run_dir / "run_summary.json" |
| if not summary_path.is_file(): |
| raise RuntimeError( |
| f"run_summary.json not produced under {run_dir} (eval issue?)" |
| ) |
| summary = json.loads(summary_path.read_text(encoding="utf-8")) |
| per_fixture: dict[str, dict[str, Any]] = {} |
| for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): |
| rp = fixture_dir / "result.json" |
| if rp.is_file(): |
| per_fixture[fixture_dir.name] = json.loads( |
| rp.read_text(encoding="utf-8") |
| ) |
| return {"run_summary": summary, "per_fixture_results": per_fixture} |
|
|
|
|
| def _publish_reports_and_gallery( |
| submission_id: str, |
| html_path: Path, |
| report_json: dict[str, Any], |
| submissions_repo: str, |
| token: str, |
| ) -> None: |
| """Publish the report HTML + JSON to the submissions dataset in one commit. |
| |
| Renders are **not** committed here: :func:`_upload_renders_to_bucket` has |
| already pushed them to the public render bucket under ``renders/<id>/``, and |
| the report HTML references them by bucket URL. Keeping the binary renders |
| out of the dataset repo avoids bloating its git history and the commit-queue |
| 429s the per-file fan-out used to cause. |
| """ |
| operations: list[CommitOperationAdd] = [ |
| CommitOperationAdd( |
| path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.html", |
| path_or_fileobj=str(html_path), |
| ), |
| CommitOperationAdd( |
| path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.json", |
| path_or_fileobj=json.dumps( |
| report_json, ensure_ascii=False, indent=2, |
| ).encode("utf-8"), |
| ), |
| ] |
| api = HfApi(token=token) |
| api.create_commit( |
| repo_id=submissions_repo, |
| repo_type="dataset", |
| operations=operations, |
| commit_message=f"publish report for {submission_id}", |
| ) |
| print( |
| f"[eval_job] published reports/{submission_id}.{{html,json}}", |
| flush=True, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|