cadgenbench-eval-gpu / eval_job.py
Michael Rabinovich
eval_job: pass submission-zip download URL to the report
1c053c8
"""In-job entrypoint for the CADGenBench eval on HF Jobs.
Invoked by the leaderboard Space's worker (see
``AI4Engineering/submit.py``) via::
hf jobs run --image hf.co/spaces/HuggingAI4Engineering/cadgenbench-eval-gpu \\
--flavor a10g-large \\
--env CADGENBENCH_DATA_REPO=HuggingAI4Engineering/cadgenbench-data \\
--env CADGENBENCH_DATA_GT_REPO=HuggingAI4Engineering/cadgenbench-data-gt \\
--env HF_SUBMISSIONS_REPO=HuggingAI4Engineering/cadgenbench-submissions \\
--env EVAL_WORKER_COUNT=8 \\
--secrets HF_TOKEN \\
python /opt/eval_job.py <submission_id> <zip_url>
Two run modes:
**Whole-submission (default, no ``--fixtures``)** -- the original path.
Synchronous, no fallbacks. Any failure raises and the container exits
non-zero; the Space's poller catches the ERROR stage and flips the
submission row to ``failed``.
1. Download ``submissions/<id>.zip`` from the submissions dataset
via ``hf_hub_download`` (auth via ``HF_TOKEN``).
2. Unpack into ``/tmp/run/``.
3. ``cadgenbench evaluate /tmp/run --workers <n>`` (subprocess).
4. ``cadgenbench report single /tmp/run -o /tmp/<id>.html``
(subprocess).
5. Build ``report.json`` bundling ``run_summary.json`` + every
per-fixture ``result.json`` (mirror of submit.py's
``_build_report_json``).
6. Upload ``reports/<id>.html`` + ``reports/<id>.json`` back to the
submissions dataset via ``HfApi.upload_file``.
7. Exit 0.
The Space-side worker then downloads ``reports/<id>.json``, reads
``run_summary`` out of it, and flips the row to ``completed``.
**Shard (``--fixtures f1,f2,... --shard-id shard_000``)** -- used by
the Space's sharded submit path (UC3) to fan a large submission across
several jobs. Steps 1-2 are identical, then the run dir is pruned to
just this shard's fixtures, ``cadgenbench evaluate`` runs over that
subset, and the resulting per-fixture dirs (``result.json`` + renders)
are staged *verbatim*. If ``CADGENBENCH_SHARD_BUCKET`` is set, the shard
syncs them into that HF Storage Bucket via the bucket API; otherwise it
uploads under ``reports/<id>/shards/<shard_id>/`` in the submissions
dataset. No
report HTML, ``report.json``, or gallery render is produced per shard:
the Space reads every shard's fixture dirs, merges them into one run dir,
and builds the single ``run_summary`` + report + gallery from the merged
whole (mirroring the orchestrator's ``_merge_eval``). Exit 0 on success;
any failure exits non-zero and the Space marks that shard ERROR and
retries it.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import zipfile
from pathlib import Path
from typing import Any
from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download
RUN_DIR = Path("/tmp/run")
REPORT_HTML_DIR = Path("/tmp")
EVAL_TIMEOUT_SECONDS = 30 * 60
REPORT_TIMEOUT_SECONDS = 5 * 60
REPORTS_DIR_IN_REPO = "reports"
RENDERS_DIR_IN_REPO = "renders"
SHARD_BUCKET_ENV = "CADGENBENCH_SHARD_BUCKET"
SHARD_BUCKET_PREFIX_ENV = "CADGENBENCH_SHARD_BUCKET_PREFIX"
# Public HF Storage Bucket the eval job uploads gallery/report renders to (the
# job is the sole render uploader; the Space never handles render bytes). The
# hosted report + gallery reference these by anonymous bucket URL. Submission
# renders only; GT renders stay in the private GT dataset.
RENDER_BUCKET = os.environ.get(
"CADGENBENCH_RENDER_BUCKET", "HuggingAI4Engineering/cadgenbench-eval-staging",
).strip()
HF_ENDPOINT = os.environ.get("HF_ENDPOINT", "https://huggingface.co").rstrip("/")
# Space-relative proxy roots the hosted report references for the *private*
# assets it can't link from the public bucket. The report is served by the
# Space at ``/reports/<id>.html``, so these absolute-path URLs resolve against
# the Space origin and the token-holding proxy streams the bytes. Kept in sync
# with the routes registered in the leaderboard Space's ``app.py``.
GT_PROXY_BASE_URL = "/gt"
INPUT_PROXY_BASE_URL = "/task-input"
def _render_base_url(submission_id: str) -> str:
"""Public ``.../resolve/renders/<id>`` base; report appends ``/<fixture>/<file>``."""
return f"{HF_ENDPOINT}/buckets/{RENDER_BUCKET}/resolve/{RENDERS_DIR_IN_REPO}/{submission_id}"
def _submission_zip_url(submission_id: str, submissions_repo: str) -> str:
"""Hub resolve URL of ``submissions/<id>.zip`` (the report's download link).
Same canonical blob URL the submit handler records as
``submission_blob_url`` and the gallery links, so the report's download
button points at the identical artifact.
"""
return (
f"{HF_ENDPOINT}/datasets/{submissions_repo}"
f"/resolve/main/submissions/{submission_id}.zip"
)
def _upload_renders_to_bucket(
run_dir: Path, submission_id: str, token: str,
) -> list[str]:
"""Upload every fixture's renders to ``renders/<id>/<fixture>/`` in the bucket.
One ``batch_bucket_files`` call for the whole submission (cheaper than a
per-file fan-out). Returns the bucket object paths that were uploaded (so
the caller can warm the CDN for them).
"""
add: list[tuple[str, str]] = []
for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()):
dest_prefix = (
f"{RENDERS_DIR_IN_REPO}/{submission_id}/{fixture_dir.name}"
)
renders_dir = fixture_dir / "renders"
if renders_dir.is_dir():
for render_path in sorted(renders_dir.iterdir()):
if render_path.suffix.lower() not in {".png", ".webp"}:
continue
add.append((str(render_path), f"{dest_prefix}/{render_path.name}"))
# The interface overlay is a per-fixture report artifact that lives at
# the fixture-dir root (not under renders/). It must ride to the bucket
# alongside the turntables so the hosted report can reference it by URL
# instead of base64-inlining it; without this the report would have a
# broken overlay link. Uploaded under the same per-fixture prefix.
overlay = fixture_dir / "interface_overlay.png"
if overlay.is_file():
add.append((str(overlay), f"{dest_prefix}/{overlay.name}"))
if not add:
return []
HfApi(token=token).batch_bucket_files(RENDER_BUCKET, add=add, token=token)
print(
f"[eval_job] uploaded {len(add)} render(s) -> "
f"hf://buckets/{RENDER_BUCKET}/{RENDERS_DIR_IN_REPO}/{submission_id}",
flush=True,
)
return [dest for _, dest in add]
def _warm_render_cdn(object_paths: list[str]) -> None:
"""Prime the CDN by fetching each freshly-uploaded render once.
A bucket serves a render via a 302 to a signed Xet CDN URL, and the very
first fetch of a brand-new object pays the chunk-reconstruction cost, which
is the lag a viewer sees opening a just-published report. Fetching each
object here (in parallel, anonymously, best-effort) warms the edge cache so
the first human hits a warm object instead. Failures are swallowed: warming
is an optimisation, never a publish blocker.
"""
import urllib.request
from concurrent.futures import ThreadPoolExecutor
def _warm(path: str) -> None:
url = f"{HF_ENDPOINT}/buckets/{RENDER_BUCKET}/resolve/{path}"
try:
with urllib.request.urlopen(url, timeout=30) as resp:
resp.read()
except Exception:
pass
if not object_paths:
return
with ThreadPoolExecutor(max_workers=16) as pool:
list(pool.map(_warm, object_paths))
print(f"[eval_job] warmed CDN for {len(object_paths)} render(s)", flush=True)
# Sub-prefix under ``reports/<id>/`` where each shard uploads its raw
# per-fixture dirs in shard mode. The Space merges these and deletes the
# whole ``shards/`` tree after a successful merge.
SHARDS_DIR_NAME = "shards"
def main() -> int:
parser = argparse.ArgumentParser(
description="Run the CADGenBench eval pipeline on an HF Job.",
)
parser.add_argument(
"submission_id",
help="Filesystem-safe slug minted by the Space's submit handler.",
)
parser.add_argument(
"zip_url",
help=(
"Canonical Hub blob URL of submissions/<id>.zip "
"(submission_blob_url from the row)."
),
)
parser.add_argument(
"--fixtures",
default=None,
help=(
"Comma-separated fixture subset for shard mode. When set, the "
"run dir is pruned to just these fixtures, evaluated, and the "
"per-fixture dirs are uploaded under "
"reports/<id>/shards/<shard-id>/ for the Space to merge. "
"Omit for the original whole-submission path."
),
)
parser.add_argument(
"--shard-id",
default=None,
help=(
"Shard label (e.g. shard_000) naming this shard's upload prefix. "
"Required when --fixtures is set."
),
)
args = parser.parse_args()
submission_id: str = args.submission_id
zip_url: str = args.zip_url
token = _require_env("HF_TOKEN")
submissions_repo = _require_env("HF_SUBMISSIONS_REPO")
worker_count = int(os.environ.get("EVAL_WORKER_COUNT", "8"))
shard_fixtures = _parse_fixtures_arg(args.fixtures)
if shard_fixtures is not None:
if not args.shard_id:
raise RuntimeError("--shard-id is required when --fixtures is set.")
print(
f"[eval_job] submission_id={submission_id} shard={args.shard_id} "
f"fixtures={len(shard_fixtures)} workers={worker_count} "
f"repo={submissions_repo}",
flush=True,
)
_prepare_run_dir(submission_id, zip_url, submissions_repo, token)
_prune_run_dir(RUN_DIR, shard_fixtures)
_run_eval(RUN_DIR, worker_count)
# The shard job is the sole uploader of its fixtures' renders to the
# permanent bucket prefix; the Space merge only assembles the report.
_warm_render_cdn(_upload_renders_to_bucket(RUN_DIR, submission_id, token))
_upload_shard_artifacts(
submission_id, args.shard_id, RUN_DIR, submissions_repo, token,
)
print(
f"[eval_job] done: {submission_id} shard={args.shard_id}",
flush=True,
)
return 0
print(
f"[eval_job] submission_id={submission_id} "
f"workers={worker_count} repo={submissions_repo}",
flush=True,
)
_prepare_run_dir(submission_id, zip_url, submissions_repo, token)
_run_eval(RUN_DIR, worker_count)
# Upload renders to the public bucket and warm the CDN, then build the
# report referencing them by URL (so the heavy WebP/PNG bytes never land in
# the HTML and the first viewer hits an already-warm edge cache).
_warm_render_cdn(_upload_renders_to_bucket(RUN_DIR, submission_id, token))
html_path = REPORT_HTML_DIR / f"{submission_id}.html"
_run_report(
RUN_DIR, html_path,
render_base_url=_render_base_url(submission_id),
download_url=_submission_zip_url(submission_id, submissions_repo),
)
report_json = _build_report_json(RUN_DIR)
_publish_reports_and_gallery(
submission_id, html_path, report_json, submissions_repo, token,
)
print(f"[eval_job] done: {submission_id}", flush=True)
return 0
def _parse_fixtures_arg(raw: str | None) -> list[str] | None:
"""Parse the ``--fixtures`` CSV into a deduped list, or ``None``.
``None`` (flag absent) selects the whole-submission path. A present
but empty/whitespace value is a usage error: a shard with no
fixtures is never something the Space should dispatch.
"""
if raw is None:
return None
names: list[str] = []
seen: set[str] = set()
for part in raw.split(","):
name = part.strip()
if not name or name in seen:
continue
seen.add(name)
names.append(name)
if not names:
raise RuntimeError("--fixtures was set but resolved to no fixture names.")
return names
def _require_env(name: str) -> str:
"""Return env var *name* or raise with a clear message."""
value = os.environ.get(name)
if not value:
raise RuntimeError(
f"Required environment variable {name!r} is unset or empty."
)
return value
def _prepare_run_dir(
submission_id: str,
zip_url: str,
submissions_repo: str,
token: str,
) -> None:
"""Download the submission zip and unpack into ``RUN_DIR``.
Derives the in-repo path from *zip_url* and pulls via
``hf_hub_download`` so token auth is handled and the file lands
in the Hub cache. *zip_url* is expected to look like
``https://huggingface.co/datasets/<repo>/resolve/main/submissions/<id>.zip``;
we accept any URL shape that ends in ``submissions/<id>.zip`` and
re-derive the in-repo filename from the *submission_id*.
"""
if RUN_DIR.exists():
shutil.rmtree(RUN_DIR)
RUN_DIR.mkdir(parents=True)
in_repo_path = f"submissions/{submission_id}.zip"
print(
f"[eval_job] downloading {submissions_repo}:{in_repo_path}",
flush=True,
)
local_zip = hf_hub_download(
repo_id=submissions_repo,
filename=in_repo_path,
repo_type="dataset",
token=token,
)
# Defensive: matches the validated shape from submit.py's
# _extract_zip, but the Space already gate-checked the zip
# contents pre-upload so we extract directly without re-
# validating zip-slip / symlinks here.
with zipfile.ZipFile(local_zip) as zf:
zf.extractall(RUN_DIR)
print(f"[eval_job] unpacked into {RUN_DIR}", flush=True)
def _prune_run_dir(run_dir: Path, fixtures: list[str]) -> None:
"""Drop every fixture dir under *run_dir* not in *fixtures*.
Shard mode unpacks the whole zip (the candidate STEPs for every
fixture) but should only evaluate this shard's slice, so we delete
the other fixture dirs before ``cadgenbench evaluate`` walks the
tree. Non-fixture files at the root (e.g. ``meta.json``) are left
untouched. Raises if a requested fixture is absent from the zip,
which would mean the Space sharded a name the submission didn't
contain (a contract violation worth a loud, retried failure).
"""
wanted = set(fixtures)
present = {p.name for p in run_dir.iterdir() if p.is_dir()}
missing = wanted - present
if missing:
raise RuntimeError(
f"Shard fixtures missing from submission zip: "
f"{', '.join(sorted(missing))}"
)
removed = 0
for child in run_dir.iterdir():
if child.is_dir() and child.name not in wanted:
shutil.rmtree(child)
removed += 1
print(
f"[eval_job] pruned run dir to {len(wanted)} shard fixture(s) "
f"(removed {removed})",
flush=True,
)
def _upload_shard_artifacts(
submission_id: str,
shard_id: str,
run_dir: Path,
submissions_repo: str,
token: str,
) -> None:
"""Upload this shard's evaluated per-fixture dirs for the Space to merge.
Persists the pruned ``run_dir`` (each ``<fixture>/`` with its
``result.json`` + ``renders/`` + any overlay PNGs) verbatim. In
bucket mode, this syncs the dir into the HF Storage Bucket via the
bucket API (no volume mount); in legacy mode, it is one dataset-repo
commit under ``reports/<id>/shards/<shard_id>/``. The Space reads
every shard's tree, copies the fixture dirs into a single merged run
dir, and builds the aggregate ``run_summary`` + report + gallery from
the whole. The per-shard ``run_summary.json`` written by
``cadgenbench evaluate`` rides along harmlessly; the merge recomputes
it over the union and ignores the partials.
"""
bucket_id = os.environ.get(SHARD_BUCKET_ENV, "").strip()
if bucket_id:
if bucket_id.startswith("hf://buckets/"):
bucket_id = bucket_id[len("hf://buckets/"):]
bucket_id = bucket_id.rstrip("/")
prefix = os.environ.get(SHARD_BUCKET_PREFIX_ENV, "submissions").strip("/")
dest = (
f"hf://buckets/{bucket_id}/{prefix}/{submission_id}/"
f"{SHARDS_DIR_NAME}/{shard_id}"
)
api = HfApi(token=token)
api.sync_bucket(source=str(run_dir), dest=dest, token=token)
print(
f"[eval_job] synced shard {shard_id} -> {dest}",
flush=True,
)
return
api = HfApi(token=token)
path_in_repo = f"{REPORTS_DIR_IN_REPO}/{submission_id}/{SHARDS_DIR_NAME}/{shard_id}"
api.upload_folder(
folder_path=str(run_dir),
path_in_repo=path_in_repo,
repo_id=submissions_repo,
repo_type="dataset",
commit_message=f"add eval shard {shard_id} for {submission_id}",
)
print(
f"[eval_job] uploaded shard {shard_id} -> {path_in_repo}",
flush=True,
)
def _run_eval(run_dir: Path, workers: int) -> None:
"""Invoke ``cadgenbench evaluate`` over *run_dir*; raise on non-zero."""
cmd = [
sys.executable, "-m", "cadgenbench.cli", "evaluate", str(run_dir),
"--workers", str(workers),
]
print(f"[eval_job] {' '.join(cmd)}", flush=True)
proc = subprocess.run(
cmd,
timeout=EVAL_TIMEOUT_SECONDS,
env=os.environ.copy(),
check=False,
)
if proc.returncode != 0:
raise RuntimeError(
f"cadgenbench evaluate exited {proc.returncode}"
)
def _run_report(
run_dir: Path,
html_out: Path,
*,
render_base_url: str | None = None,
download_url: str | None = None,
) -> None:
"""Invoke ``cadgenbench report single`` for *run_dir*; raise on non-zero.
Passes ``--render-base-url`` so candidate renders are referenced from the
public bucket rather than base64-inlined into the hosted HTML.
"""
cmd = [
sys.executable, "-m", "cadgenbench.cli", "report", "single",
str(run_dir), "-o", str(html_out),
]
if render_base_url:
cmd += [
"--render-base-url", render_base_url,
# GT + inputs are private, so they link through the Space proxy
# rather than the public bucket. Passed alongside the render base
# so the whole hosted report is lazy-loaded links, not base64.
"--gt-base-url", GT_PROXY_BASE_URL,
"--input-base-url", INPUT_PROXY_BASE_URL,
]
if download_url:
cmd += ["--download-url", download_url]
print(f"[eval_job] {' '.join(cmd)}", flush=True)
proc = subprocess.run(
cmd,
timeout=REPORT_TIMEOUT_SECONDS,
env=os.environ.copy(),
check=False,
)
if proc.returncode != 0 or not html_out.is_file():
raise RuntimeError(
f"cadgenbench report single exited {proc.returncode} "
f"(html exists={html_out.is_file()})"
)
def _build_report_json(run_dir: Path) -> dict[str, Any]:
"""Bundle ``run_summary.json`` + every per-fixture ``result.json``.
Identical shape to submit.py's ``_build_report_json``: the
Space-side worker reads ``report.json`` after the Job completes
and pulls ``run_summary`` out of it to flip the row.
"""
summary_path = run_dir / "run_summary.json"
if not summary_path.is_file():
raise RuntimeError(
f"run_summary.json not produced under {run_dir} (eval issue?)"
)
summary = json.loads(summary_path.read_text(encoding="utf-8"))
per_fixture: dict[str, dict[str, Any]] = {}
for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()):
rp = fixture_dir / "result.json"
if rp.is_file():
per_fixture[fixture_dir.name] = json.loads(
rp.read_text(encoding="utf-8")
)
return {"run_summary": summary, "per_fixture_results": per_fixture}
def _publish_reports_and_gallery(
submission_id: str,
html_path: Path,
report_json: dict[str, Any],
submissions_repo: str,
token: str,
) -> None:
"""Publish the report HTML + JSON to the submissions dataset in one commit.
Renders are **not** committed here: :func:`_upload_renders_to_bucket` has
already pushed them to the public render bucket under ``renders/<id>/``, and
the report HTML references them by bucket URL. Keeping the binary renders
out of the dataset repo avoids bloating its git history and the commit-queue
429s the per-file fan-out used to cause.
"""
operations: list[CommitOperationAdd] = [
CommitOperationAdd(
path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.html",
path_or_fileobj=str(html_path),
),
CommitOperationAdd(
path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.json",
path_or_fileobj=json.dumps(
report_json, ensure_ascii=False, indent=2,
).encode("utf-8"),
),
]
api = HfApi(token=token)
api.create_commit(
repo_id=submissions_repo,
repo_type="dataset",
operations=operations,
commit_message=f"publish report for {submission_id}",
)
print(
f"[eval_job] published reports/{submission_id}.{{html,json}}",
flush=True,
)
if __name__ == "__main__":
sys.exit(main())