cadgenbench-eval-gpu / eval_job.py
Michael Rabinovich
eval_job: add shard mode (--fixtures/--shard-id) for sharded submission eval
b2e9d3a
raw
history blame
16.1 kB
"""In-job entrypoint for the CADGenBench eval on HF Jobs.
Invoked by the leaderboard Space's worker (see
``AI4Engineering/submit.py``) via::
hf jobs run --image hf.co/spaces/HuggingAI4Engineering/cadgenbench-eval-gpu \\
--flavor a10g-large \\
--env CADGENBENCH_DATA_REPO=HuggingAI4Engineering/cadgenbench-data \\
--env CADGENBENCH_DATA_GT_REPO=HuggingAI4Engineering/cadgenbench-data-gt \\
--env HF_SUBMISSIONS_REPO=HuggingAI4Engineering/cadgenbench-submissions \\
--env EVAL_WORKER_COUNT=8 \\
--secrets HF_TOKEN \\
python /opt/eval_job.py <submission_id> <zip_url>
Two run modes:
**Whole-submission (default, no ``--fixtures``)** -- the original path.
Synchronous, no fallbacks. Any failure raises and the container exits
non-zero; the Space's poller catches the ERROR stage and flips the
submission row to ``failed``.
1. Download ``submissions/<id>.zip`` from the submissions dataset
via ``hf_hub_download`` (auth via ``HF_TOKEN``).
2. Unpack into ``/tmp/run/``.
3. ``cadgenbench evaluate /tmp/run --workers <n>`` (subprocess).
4. ``cadgenbench report single /tmp/run -o /tmp/<id>.html``
(subprocess).
5. Build ``report.json`` bundling ``run_summary.json`` + every
per-fixture ``result.json`` (mirror of submit.py's
``_build_report_json``).
6. Upload ``reports/<id>.html`` + ``reports/<id>.json`` back to the
submissions dataset via ``HfApi.upload_file``.
7. Exit 0.
The Space-side worker then downloads ``reports/<id>.json``, reads
``run_summary`` out of it, and flips the row to ``completed``.
**Shard (``--fixtures f1,f2,... --shard-id shard_000``)** -- used by
the Space's sharded submit path (UC3) to fan a large submission across
several jobs. Steps 1-2 are identical, then the run dir is pruned to
just this shard's fixtures, ``cadgenbench evaluate`` runs over that
subset, and the resulting per-fixture dirs (``result.json`` + renders)
are uploaded *verbatim* under ``reports/<id>/shards/<shard_id>/``. No
report HTML, ``report.json``, or gallery render is produced per shard:
the Space downloads every shard's fixture dirs, merges them into one
run dir, and builds the single ``run_summary`` + report + gallery from
the merged whole (mirroring the orchestrator's ``_merge_eval``). Exit 0
on success; any failure exits non-zero and the Space marks that shard
ERROR and retries it.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import zipfile
from pathlib import Path
from typing import Any
from huggingface_hub import HfApi, hf_hub_download
RUN_DIR = Path("/tmp/run")
REPORT_HTML_DIR = Path("/tmp")
EVAL_TIMEOUT_SECONDS = 30 * 60
REPORT_TIMEOUT_SECONDS = 5 * 60
REPORTS_DIR_IN_REPO = "reports"
RENDERS_DIR_IN_REPO = "renders"
# Sub-prefix under ``reports/<id>/`` where each shard uploads its raw
# per-fixture dirs in shard mode. The Space merges these and deletes the
# whole ``shards/`` tree after a successful merge.
SHARDS_DIR_NAME = "shards"
# Single canonical view uploaded per fixture for the leaderboard
# gallery thumbnail. "iso" matches the GT render the gallery pairs it
# with, so the gallery columns stay a comparable matrix at one fixed
# camera angle. The evaluator writes this at
# ``<run_dir>/<fixture>/renders/iso.png`` (cadgenbench DEFAULT_VIEWS).
GALLERY_THUMB_VIEW = "iso"
def main() -> int:
parser = argparse.ArgumentParser(
description="Run the CADGenBench eval pipeline on an HF Job.",
)
parser.add_argument(
"submission_id",
help="Filesystem-safe slug minted by the Space's submit handler.",
)
parser.add_argument(
"zip_url",
help=(
"Canonical Hub blob URL of submissions/<id>.zip "
"(submission_blob_url from the row)."
),
)
parser.add_argument(
"--fixtures",
default=None,
help=(
"Comma-separated fixture subset for shard mode. When set, the "
"run dir is pruned to just these fixtures, evaluated, and the "
"per-fixture dirs are uploaded under "
"reports/<id>/shards/<shard-id>/ for the Space to merge. "
"Omit for the original whole-submission path."
),
)
parser.add_argument(
"--shard-id",
default=None,
help=(
"Shard label (e.g. shard_000) naming this shard's upload prefix. "
"Required when --fixtures is set."
),
)
args = parser.parse_args()
submission_id: str = args.submission_id
zip_url: str = args.zip_url
token = _require_env("HF_TOKEN")
submissions_repo = _require_env("HF_SUBMISSIONS_REPO")
worker_count = int(os.environ.get("EVAL_WORKER_COUNT", "8"))
shard_fixtures = _parse_fixtures_arg(args.fixtures)
if shard_fixtures is not None:
if not args.shard_id:
raise RuntimeError("--shard-id is required when --fixtures is set.")
print(
f"[eval_job] submission_id={submission_id} shard={args.shard_id} "
f"fixtures={len(shard_fixtures)} workers={worker_count} "
f"repo={submissions_repo}",
flush=True,
)
_prepare_run_dir(submission_id, zip_url, submissions_repo, token)
_prune_run_dir(RUN_DIR, shard_fixtures)
_run_eval(RUN_DIR, worker_count)
_upload_shard_artifacts(
submission_id, args.shard_id, RUN_DIR, submissions_repo, token,
)
print(
f"[eval_job] done: {submission_id} shard={args.shard_id}",
flush=True,
)
return 0
print(
f"[eval_job] submission_id={submission_id} "
f"workers={worker_count} repo={submissions_repo}",
flush=True,
)
_prepare_run_dir(submission_id, zip_url, submissions_repo, token)
_run_eval(RUN_DIR, worker_count)
html_path = REPORT_HTML_DIR / f"{submission_id}.html"
_run_report(RUN_DIR, html_path)
report_json = _build_report_json(RUN_DIR)
_upload_reports(
submission_id, html_path, report_json, submissions_repo, token,
)
_upload_gallery_renders(submission_id, RUN_DIR, submissions_repo, token)
print(f"[eval_job] done: {submission_id}", flush=True)
return 0
def _parse_fixtures_arg(raw: str | None) -> list[str] | None:
"""Parse the ``--fixtures`` CSV into a deduped list, or ``None``.
``None`` (flag absent) selects the whole-submission path. A present
but empty/whitespace value is a usage error: a shard with no
fixtures is never something the Space should dispatch.
"""
if raw is None:
return None
names: list[str] = []
seen: set[str] = set()
for part in raw.split(","):
name = part.strip()
if not name or name in seen:
continue
seen.add(name)
names.append(name)
if not names:
raise RuntimeError("--fixtures was set but resolved to no fixture names.")
return names
def _require_env(name: str) -> str:
"""Return env var *name* or raise with a clear message."""
value = os.environ.get(name)
if not value:
raise RuntimeError(
f"Required environment variable {name!r} is unset or empty."
)
return value
def _prepare_run_dir(
submission_id: str,
zip_url: str,
submissions_repo: str,
token: str,
) -> None:
"""Download the submission zip and unpack into ``RUN_DIR``.
Derives the in-repo path from *zip_url* and pulls via
``hf_hub_download`` so token auth is handled and the file lands
in the Hub cache. *zip_url* is expected to look like
``https://huggingface.co/datasets/<repo>/resolve/main/submissions/<id>.zip``;
we accept any URL shape that ends in ``submissions/<id>.zip`` and
re-derive the in-repo filename from the *submission_id*.
"""
if RUN_DIR.exists():
shutil.rmtree(RUN_DIR)
RUN_DIR.mkdir(parents=True)
in_repo_path = f"submissions/{submission_id}.zip"
print(
f"[eval_job] downloading {submissions_repo}:{in_repo_path}",
flush=True,
)
local_zip = hf_hub_download(
repo_id=submissions_repo,
filename=in_repo_path,
repo_type="dataset",
token=token,
)
# Defensive: matches the validated shape from submit.py's
# _extract_zip, but the Space already gate-checked the zip
# contents pre-upload so we extract directly without re-
# validating zip-slip / symlinks here.
with zipfile.ZipFile(local_zip) as zf:
zf.extractall(RUN_DIR)
print(f"[eval_job] unpacked into {RUN_DIR}", flush=True)
def _prune_run_dir(run_dir: Path, fixtures: list[str]) -> None:
"""Drop every fixture dir under *run_dir* not in *fixtures*.
Shard mode unpacks the whole zip (the candidate STEPs for every
fixture) but should only evaluate this shard's slice, so we delete
the other fixture dirs before ``cadgenbench evaluate`` walks the
tree. Non-fixture files at the root (e.g. ``meta.json``) are left
untouched. Raises if a requested fixture is absent from the zip,
which would mean the Space sharded a name the submission didn't
contain (a contract violation worth a loud, retried failure).
"""
wanted = set(fixtures)
present = {p.name for p in run_dir.iterdir() if p.is_dir()}
missing = wanted - present
if missing:
raise RuntimeError(
f"Shard fixtures missing from submission zip: "
f"{', '.join(sorted(missing))}"
)
removed = 0
for child in run_dir.iterdir():
if child.is_dir() and child.name not in wanted:
shutil.rmtree(child)
removed += 1
print(
f"[eval_job] pruned run dir to {len(wanted)} shard fixture(s) "
f"(removed {removed})",
flush=True,
)
def _upload_shard_artifacts(
submission_id: str,
shard_id: str,
run_dir: Path,
submissions_repo: str,
token: str,
) -> None:
"""Upload this shard's evaluated per-fixture dirs for the Space to merge.
Pushes the pruned ``run_dir`` (each ``<fixture>/`` with its
``result.json`` + ``renders/`` + any overlay PNGs) verbatim to
``reports/<id>/shards/<shard_id>/`` in one commit. The Space
downloads every shard's tree, copies the fixture dirs into a single
merged run dir, and builds the aggregate ``run_summary`` + report +
gallery from the whole. The per-shard ``run_summary.json`` written
by ``cadgenbench evaluate`` rides along harmlessly; the merge
recomputes it over the union and ignores the partials.
"""
api = HfApi(token=token)
path_in_repo = f"{REPORTS_DIR_IN_REPO}/{submission_id}/{SHARDS_DIR_NAME}/{shard_id}"
api.upload_folder(
folder_path=str(run_dir),
path_in_repo=path_in_repo,
repo_id=submissions_repo,
repo_type="dataset",
commit_message=f"add eval shard {shard_id} for {submission_id}",
)
print(
f"[eval_job] uploaded shard {shard_id} -> {path_in_repo}",
flush=True,
)
def _run_eval(run_dir: Path, workers: int) -> None:
"""Invoke ``cadgenbench evaluate`` over *run_dir*; raise on non-zero."""
cmd = [
sys.executable, "-m", "cadgenbench.cli", "evaluate", str(run_dir),
"--workers", str(workers),
]
print(f"[eval_job] {' '.join(cmd)}", flush=True)
proc = subprocess.run(
cmd,
timeout=EVAL_TIMEOUT_SECONDS,
env=os.environ.copy(),
check=False,
)
if proc.returncode != 0:
raise RuntimeError(
f"cadgenbench evaluate exited {proc.returncode}"
)
def _run_report(run_dir: Path, html_out: Path) -> None:
"""Invoke ``cadgenbench report single`` for *run_dir*; raise on non-zero."""
cmd = [
sys.executable, "-m", "cadgenbench.cli", "report", "single",
str(run_dir), "-o", str(html_out),
]
print(f"[eval_job] {' '.join(cmd)}", flush=True)
proc = subprocess.run(
cmd,
timeout=REPORT_TIMEOUT_SECONDS,
env=os.environ.copy(),
check=False,
)
if proc.returncode != 0 or not html_out.is_file():
raise RuntimeError(
f"cadgenbench report single exited {proc.returncode} "
f"(html exists={html_out.is_file()})"
)
def _build_report_json(run_dir: Path) -> dict[str, Any]:
"""Bundle ``run_summary.json`` + every per-fixture ``result.json``.
Identical shape to submit.py's ``_build_report_json``: the
Space-side worker reads ``report.json`` after the Job completes
and pulls ``run_summary`` out of it to flip the row.
"""
summary_path = run_dir / "run_summary.json"
if not summary_path.is_file():
raise RuntimeError(
f"run_summary.json not produced under {run_dir} (eval issue?)"
)
summary = json.loads(summary_path.read_text(encoding="utf-8"))
per_fixture: dict[str, dict[str, Any]] = {}
for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()):
rp = fixture_dir / "result.json"
if rp.is_file():
per_fixture[fixture_dir.name] = json.loads(
rp.read_text(encoding="utf-8")
)
return {"run_summary": summary, "per_fixture_results": per_fixture}
def _upload_reports(
submission_id: str,
html_path: Path,
report_json: dict[str, Any],
submissions_repo: str,
token: str,
) -> None:
"""Upload ``reports/<id>.html`` + ``reports/<id>.json`` to the Hub."""
api = HfApi(token=token)
api.upload_file(
path_or_fileobj=str(html_path),
path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.html",
repo_id=submissions_repo,
repo_type="dataset",
commit_message=f"add HTML report for {submission_id}",
)
api.upload_file(
path_or_fileobj=json.dumps(
report_json, ensure_ascii=False, indent=2,
).encode("utf-8"),
path_in_repo=f"{REPORTS_DIR_IN_REPO}/{submission_id}.json",
repo_id=submissions_repo,
repo_type="dataset",
commit_message=f"add JSON report for {submission_id}",
)
print(
f"[eval_job] uploaded reports/{submission_id}.{{html,json}}",
flush=True,
)
def _upload_gallery_renders(
submission_id: str,
run_dir: Path,
submissions_repo: str,
token: str,
) -> None:
"""Upload one ``iso`` thumbnail per fixture for the leaderboard gallery.
Stages every ``<run_dir>/<fixture>/renders/iso.png`` as
``renders/<id>/<fixture>.png`` in the submissions dataset (one
commit). These are the standalone PNGs the gallery's
``renderFor()`` points at; the full multi-view renders stay
base64-embedded in ``reports/<id>.html`` for the self-contained
report. A fixture with no ``iso.png`` (missing output, or a render
that never ran) is simply skipped, the gallery reads the
per-fixture status from the row and draws the dashed "invalid
generation" cell, so a thumbnail's absence is not an error.
"""
staged: list[tuple[Path, str]] = []
for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()):
iso_png = fixture_dir / "renders" / f"{GALLERY_THUMB_VIEW}.png"
if iso_png.is_file():
staged.append((iso_png, fixture_dir.name))
if not staged:
print(
f"[eval_job] no gallery renders to upload for {submission_id}",
flush=True,
)
return
api = HfApi(token=token)
for iso_png, fixture_name in staged:
api.upload_file(
path_or_fileobj=str(iso_png),
path_in_repo=(
f"{RENDERS_DIR_IN_REPO}/{submission_id}/{fixture_name}.png"
),
repo_id=submissions_repo,
repo_type="dataset",
commit_message=(
f"add gallery render {fixture_name} for {submission_id}"
),
)
print(
f"[eval_job] uploaded {len(staged)} gallery render(s) under "
f"{RENDERS_DIR_IN_REPO}/{submission_id}/",
flush=True,
)
if __name__ == "__main__":
sys.exit(main())