| """Submit-tab handler for the CADGenBench leaderboard Space. |
| |
| Step 6 (E) chunks 2 + 3 + 4 + 6 + Step 10 (jobs migration): cheap-sync |
| validation + pending-row write + zip upload + background dispatch + |
| poll of an HF Jobs GPU eval + boot-time stuck-pending sweep. The |
| handler validates the upload, uploads the zip to |
| ``submissions/<id>.zip``, appends a ``status: pending`` row to |
| ``results.jsonl`` (under a process-wide lock), spawns a daemon thread |
| that dispatches a per-submission HF Job against the |
| ``HuggingAI4Engineering/cadgenbench-eval-gpu`` image and polls |
| ``inspect_job`` until the job's stage is terminal. On COMPLETED the |
| worker downloads ``reports/<id>.json`` (the Job already uploaded |
| ``reports/<id>.{html,json}`` to the submissions dataset), reads |
| ``run_summary`` out of it, and flips the row ``pending -> completed``. |
| On ERROR (or any dispatch / poll exception) the row flips to ``failed`` |
| with a short ``failure_reason``. At module import a one-shot daemon |
| sweep flips any ``pending`` row whose ``submitted_at`` is older than |
| 30 min to ``failed`` with a "Space restart" reason, so rows stranded by |
| a deploy / OOM / crash / orphaned Job don't sit pending forever. |
| |
| Validation gates, in order: |
| |
| 1. Form-level: a file was attached. |
| 2. Zip safety: parseable as a zip, no absolute / parent-traversing |
| entry names, no symlinks. |
| 3. ``meta.json`` schema: required keys present, types sane, |
| ``agree_to_publish`` is literally ``true`` (the sole consent |
| gate; no separate UI checkbox), ``notes`` is non-empty when |
| present and within the per-submission cap. |
| 4. Fixture-set match: the set of folders inside the zip equals the |
| set of fixture directories in :func:`cadgenbench.common.paths.data_inputs_dir` |
| (no missing, no extras). |
| 5. STEP parseability: each ``<fixture>/output.step`` loads as STEP |
| geometry. Per-fixture validity (watertight, manifold, etc) is |
| *not* checked here, that's the evaluator's job and contributes to |
| the per-fixture score; this gate only rejects "not actually STEP". |
| |
| Hub-write ordering (after validation passes): |
| |
| 1. Upload ``submissions/<id>.zip``. Unique path per submission, no |
| lock needed. |
| 2. Build pending row (metadata + null scores + ``submission_blob_url``). |
| 3. Acquire ``_HUB_LOCK``; download current ``results.jsonl`` (or |
| start empty); append the pending row; re-upload. |
| 4. Spawn worker thread (daemon, named after submission_id). The |
| worker owns the tempdir's lifecycle past this point. |
| |
| If step 1 fails the user sees a clean rejection. If step 3 fails the |
| zip is left orphaned in ``submissions/`` and the user sees a clean |
| rejection; an orphan-zip sweep is a future-chunk concern. |
| |
| Background worker, per submission: |
| |
| 1. ``huggingface_hub.run_job(...)`` dispatches an HF Job against |
| the ``cadgenbench-eval-gpu`` Space image on ``a10g-large``, |
| passing the submission_id + zip blob URL as command args and |
| ``HF_TOKEN`` as a secret. |
| 2. Poll ``inspect_job(job_id)`` every few seconds until the job's |
| stage is terminal (``COMPLETED`` or ``ERROR``). Outer deadline |
| guards against an unresponsive poll surface. |
| 3. On ``COMPLETED``: download ``reports/<id>.json`` from the |
| submissions dataset (the Job uploaded both |
| ``reports/<id>.{html,json}`` before exiting), read |
| ``run_summary`` out of the bundled payload, under ``_HUB_LOCK`` |
| flip the row to ``"completed"`` and merge the score fields. |
| 4. On ``ERROR`` (or any dispatch / poll exception), flip the row to |
| ``"failed"`` with a short ``failure_reason`` (the job's |
| ``status.message`` plus the last N lines of ``fetch_job_logs``). |
| """ |
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import logging |
| import os |
| import re |
| import shutil |
| import tempfile |
| import threading |
| import time |
| import zipfile |
| from concurrent.futures import ThreadPoolExecutor |
| from datetime import datetime, timedelta, timezone |
| from pathlib import Path |
| from typing import Any |
|
|
| import cadgenbench |
| import gradio as gr |
| from cadgenbench.common.paths import data_inputs_dir |
| from cadgenbench.common.validity import parse_step |
| from huggingface_hub import ( |
| HfApi, |
| fetch_job_logs, |
| hf_hub_download, |
| inspect_job, |
| run_job, |
| ) |
| from huggingface_hub.errors import EntryNotFoundError |
|
|
| from leaderboard import HF_DATA_REPO, HF_SUBMISSIONS_REPO |
|
|
| logger = logging.getLogger(__name__) |
|
|
| NOTES_MAX_CHARS = 500 |
| REQUIRED_META_KEYS: tuple[str, ...] = ( |
| "submitter_name", |
| "submission_name", |
| "agent_url", |
| "notes", |
| "agree_to_publish", |
| ) |
| SUBMISSION_ID_SLUG_MAX = 40 |
| RESULTS_FILENAME = "results.jsonl" |
| SUBMISSIONS_DIR = "submissions" |
| REPORTS_DIR = "reports" |
| DATA_REV_SHORT_LEN = 12 |
| FAILURE_REASON_MAX_CHARS = 200 |
| SHA256_BLOCK_SIZE = 64 * 1024 |
| STUCK_PENDING_THRESHOLD_SECONDS = 30 * 60 |
| SUBMITTED_AT_FORMAT = "%Y-%m-%dT%H:%M:%SZ" |
| STUCK_PENDING_REASON = "evaluation interrupted by Space restart" |
| BOOT_SWEEP_ENV = "CADGENBENCH_DISABLE_BOOT_SWEEP" |
|
|
| |
| |
| |
| |
| |
| EVAL_GPU_SPACE = "HuggingAI4Engineering/cadgenbench-eval-gpu" |
| EVAL_JOB_FLAVOR = "a10g-large" |
| EVAL_JOB_NAMESPACE = "michaelr27" |
| EVAL_JOB_TIMEOUT = "30m" |
| EVAL_JOB_WORKER_COUNT = "8" |
|
|
| |
| |
| |
| |
| |
| JOB_POLL_INTERVAL_SECONDS = 5 |
| JOB_POLL_DEADLINE_SECONDS = 35 * 60 |
| JOB_LOG_TAIL_LINES = 30 |
| JOB_POLL_MAX_CONSECUTIVE_ERRORS = 5 |
|
|
| |
| |
| _HF_API = HfApi() |
|
|
| |
| |
| |
| _HUB_LOCK = threading.Lock() |
|
|
| |
| _DATA_REVISION: str | None = None |
|
|
|
|
| class _ValidationError(Exception): |
| """Internal sentinel that maps to a user-facing rejection message.""" |
|
|
|
|
| class _HubWriteError(Exception): |
| """Raised when a Hub upload fails after validation succeeded.""" |
|
|
|
|
| def handle_submit( |
| zip_file, |
| profile: gr.OAuthProfile | None, |
| ) -> None: |
| """Validate a submission upload; surface progress + outcome via toasts. |
| |
| Requires the user to be logged in via ``gr.LoginButton`` so the |
| row's ``hf_username`` is the canonical HF identity (not a |
| free-text claim). The submit button in :mod:`app` is disabled |
| until login completes; this function also raises ``gr.Error`` |
| defensively if it's called without a profile so a UI mishap |
| can't write an anonymous row. |
| |
| Side-effect-only (returns ``None``): in-flight status surfaces via |
| ``gr.Info`` toasts (validating on click, queued after the worker |
| spawns) and ``gr.Error`` toasts for rejections. Rejection paths |
| raise ``gr.Error``, which Gradio surfaces as a red toast and |
| aborts the handler; the outer ``try/finally`` still runs to clean |
| up the temp dir. |
| |
| Toasts on the happy path: |
| |
| 1. ``"Validating submission..."`` on click, with a wall-time hint |
| so the user knows the post-queue eval takes ~1 minute. |
| 2. ``"Submission <id> queued ..."`` once the row + zip are on the |
| Hub and the worker has been spawned. |
| |
| On rejection (login-missing, form-level, validation gate, dedup, |
| or Hub write), a single ``gr.Error`` toast carries the message; |
| no second toast. |
| """ |
| if profile is None: |
| raise gr.Error("Please log in via the HF button before submitting.") |
| form_err = _validate_form(zip_file) |
| if form_err is not None: |
| raise gr.Error(form_err) |
|
|
| |
| |
| |
| gr.Info( |
| "Validating submission... evaluation usually takes " |
| "~1 minute after queuing." |
| ) |
|
|
| zip_path = Path(zip_file.name) |
|
|
| |
| |
| |
| |
| |
| tmp = Path(tempfile.mkdtemp(prefix="cadgenbench-submit-")) |
| run_dir = tmp / "run" |
| run_dir.mkdir() |
| try: |
| try: |
| _extract_zip(zip_path, run_dir) |
| meta = _load_and_validate_meta(run_dir) |
| fixture_names = _validate_fixture_set(run_dir) |
| _validate_steps_parseable(run_dir, fixture_names) |
| except _ValidationError as e: |
| raise gr.Error(f"Submission rejected: {e}") |
|
|
| |
| |
| |
| zip_sha256 = _compute_sha256(zip_path) |
| existing_id = _find_existing_submission_by_sha256(zip_sha256) |
| if existing_id is not None: |
| raise gr.Error( |
| f"This zip's contents are identical to an existing " |
| f"submission ({existing_id}). Resubmit only after changing " |
| f"at least one byte of the upload." |
| ) |
|
|
| submission_id = _mint_submission_id( |
| meta["submitter_name"], meta["submission_name"] |
| ) |
| try: |
| blob_url = _upload_submission_zip(submission_id, zip_path) |
| row = _build_pending_row( |
| submission_id, meta, blob_url, zip_sha256, |
| hf_username=profile.username, |
| ) |
| _append_pending_row(row) |
| except _HubWriteError as e: |
| raise gr.Error(f"Submission rejected: {e}") |
|
|
| _spawn_worker(submission_id, blob_url) |
| gr.Info( |
| f"Submission {submission_id} queued for evaluation " |
| f"({len(fixture_names)} fixtures). The eval runs on an " |
| f"HF Jobs GPU; the row flips to completed when the job " |
| f"finishes (typically 1-3 minutes)." |
| ) |
| finally: |
| shutil.rmtree(tmp, ignore_errors=True) |
|
|
|
|
| def _validate_form(zip_file) -> str | None: |
| """Form-level check before any zip parsing. |
| |
| Returns a plain-text rejection message (no markdown wrapping; |
| the caller wraps it into a ``gr.Error`` toast) or ``None`` when |
| the form is acceptable. |
| """ |
| if zip_file is None: |
| return "Please attach a submission zip." |
| return None |
|
|
|
|
| def _extract_zip(zip_path: Path, target: Path) -> None: |
| """Extract *zip_path* into *target* with zip-slip + symlink rejection. |
| |
| Python's ``ZipFile.extractall`` since 3.12 normalises away unsafe |
| paths silently; we'd rather reject the upload outright so the |
| submitter sees a clear error instead of getting a "fixture set |
| mismatch" downstream because half their files were dropped. |
| """ |
| try: |
| with zipfile.ZipFile(zip_path) as zf: |
| for info in zf.infolist(): |
| if info.is_dir(): |
| continue |
| name = Path(info.filename) |
| if name.is_absolute() or ".." in name.parts: |
| raise _ValidationError( |
| f"Zip contains an unsafe path: {info.filename!r}." |
| ) |
| |
| |
| mode = info.external_attr >> 16 |
| if mode and (mode & 0o170000) == 0o120000: |
| raise _ValidationError( |
| f"Zip contains a symlink ({info.filename!r}); " |
| f"submissions must be plain files." |
| ) |
| zf.extractall(target) |
| except zipfile.BadZipFile as e: |
| raise _ValidationError(f"Upload is not a valid zip file: {e}") from e |
|
|
|
|
| def _load_and_validate_meta(unpacked: Path) -> dict[str, Any]: |
| meta_path = unpacked / "meta.json" |
| if not meta_path.is_file(): |
| raise _ValidationError( |
| "Zip is missing top-level `meta.json` (expected at the root of " |
| "the zip, alongside the per-fixture folders)." |
| ) |
| try: |
| meta = json.loads(meta_path.read_text()) |
| except json.JSONDecodeError as e: |
| raise _ValidationError( |
| f"`meta.json` is not valid JSON: {e.msg} (line {e.lineno})." |
| ) from e |
| if not isinstance(meta, dict): |
| raise _ValidationError( |
| "`meta.json` must be a JSON object at the top level." |
| ) |
|
|
| missing = [k for k in REQUIRED_META_KEYS if k not in meta] |
| if missing: |
| raise _ValidationError( |
| f"`meta.json` is missing required key(s): {', '.join(missing)}." |
| ) |
|
|
| for k in ("submitter_name", "submission_name"): |
| v = meta[k] |
| if not isinstance(v, str) or not v.strip(): |
| raise _ValidationError( |
| f"`meta.json` field `{k}` must be a non-empty string." |
| ) |
|
|
| for k in ("agent_url", "notes"): |
| v = meta[k] |
| if v is not None and not isinstance(v, str): |
| raise _ValidationError( |
| f"`meta.json` field `{k}` must be a string or null." |
| ) |
|
|
| if meta["agree_to_publish"] is not True: |
| raise _ValidationError( |
| "`meta.json` field `agree_to_publish` must be the literal boolean " |
| "`true`." |
| ) |
|
|
| if meta["notes"] is not None: |
| meta["notes"] = _normalize_notes(meta["notes"]) |
|
|
| return meta |
|
|
|
|
| def _normalize_notes(raw: str) -> str: |
| """Collapse newlines + tabs to spaces, strip, enforce the char cap.""" |
| one_line = re.sub(r"[\r\n\t]+", " ", raw).strip() |
| if len(one_line) > NOTES_MAX_CHARS: |
| raise _ValidationError( |
| f"`meta.json` field `notes` exceeds the {NOTES_MAX_CHARS}-char " |
| f"cap (got {len(one_line)} after stripping). Trim and resubmit." |
| ) |
| return one_line |
|
|
|
|
| def _validate_fixture_set(unpacked: Path) -> set[str]: |
| """Compare unpacked top-level dirs to the inputs dataset's fixture set.""" |
| actual = {p.name for p in unpacked.iterdir() if p.is_dir()} |
|
|
| try: |
| inputs_root = data_inputs_dir() |
| except Exception as e: |
| raise _ValidationError( |
| f"Server-side error resolving the fixture set " |
| f"({type(e).__name__}: {e})." |
| ) from e |
| expected = {p.name for p in inputs_root.iterdir() if p.is_dir()} |
|
|
| missing = expected - actual |
| extras = actual - expected |
| if missing or extras: |
| parts: list[str] = [] |
| if missing: |
| parts.append(f"missing fixture(s): {', '.join(sorted(missing))}") |
| if extras: |
| parts.append(f"unexpected folder(s): {', '.join(sorted(extras))}") |
| raise _ValidationError( |
| "Fixture set does not match the dataset. " + "; ".join(parts) + "." |
| ) |
| return expected |
|
|
|
|
| def _validate_steps_parseable(unpacked: Path, fixture_names: set[str]) -> None: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def _check_one_step(name: str) -> None: |
| step = unpacked / name / "output.step" |
| if not step.is_file(): |
| raise _ValidationError( |
| f"Fixture `{name}` is missing its `output.step` file." |
| ) |
| if step.stat().st_size == 0: |
| raise _ValidationError( |
| f"Fixture `{name}` has an empty `output.step`." |
| ) |
| try: |
| parse_step(step) |
| except RuntimeError as e: |
| raise _ValidationError( |
| f"Fixture `{name}` has an `output.step` that is not loadable " |
| f"as STEP geometry: {e}" |
| ) from e |
|
|
| with ThreadPoolExecutor( |
| max_workers=min(8, os.cpu_count() or 1), |
| ) as ex: |
| list(ex.map(_check_one_step, sorted(fixture_names))) |
|
|
|
|
| def _mint_submission_id(submitter_name: str, submission_name: str) -> str: |
| """Build the basename used for ``submissions/<id>.zip`` and ``reports/<id>.*``.""" |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") |
| return f"{_slug(submitter_name)}_{_slug(submission_name)}_{ts}" |
|
|
|
|
| def _slug(s: str) -> str: |
| """Filesystem-safe slug. Lowercase, ``[a-z0-9-]``, collapsed dashes.""" |
| cleaned = re.sub(r"[^A-Za-z0-9]+", "-", s).strip("-").lower() |
| return cleaned[:SUBMISSION_ID_SLUG_MAX] or "unnamed" |
|
|
|
|
| def _upload_submission_zip(submission_id: str, zip_path: Path) -> str: |
| """Upload the submission zip to ``submissions/<id>.zip``. |
| |
| Returns the canonical Hub blob URL on success. Raises |
| :class:`_HubWriteError` with a short user-facing reason on |
| failure. |
| """ |
| repo_path = f"{SUBMISSIONS_DIR}/{submission_id}.zip" |
| try: |
| _HF_API.upload_file( |
| path_or_fileobj=str(zip_path), |
| path_in_repo=repo_path, |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| commit_message=f"add submission zip for {submission_id}", |
| ) |
| except Exception as e: |
| logger.exception("Failed to upload submission zip %s", submission_id) |
| raise _HubWriteError( |
| f"Server-side error uploading submission zip " |
| f"({type(e).__name__}: {e}). Please try again later." |
| ) from e |
| return ( |
| f"https://huggingface.co/datasets/{HF_SUBMISSIONS_REPO}" |
| f"/resolve/main/{repo_path}" |
| ) |
|
|
|
|
| def _build_pending_row( |
| submission_id: str, |
| meta: dict[str, Any], |
| blob_url: str, |
| submission_sha256: str, |
| hf_username: str | None = None, |
| ) -> dict[str, Any]: |
| """Construct the JSON row written for a freshly-queued submission. |
| |
| Mirrors the pending regime in ``cadgenbench-submissions/schema.md``: |
| metadata + ``status: pending`` + ``submission_blob_url`` + |
| ``submission_sha256``; every score-shaped field is ``null`` until |
| the worker flips the row. |
| |
| Validation-tier fields default per the validation-policy decision |
| doc: ``validation_status: "unvalidated"`` (maintainers promote |
| post-eval), ``validation_method: None``. ``hf_username`` defaults |
| to ``None``; callers post-OAuth pass ``profile.username`` so the |
| row carries the canonical HF identity. Pre-OAuth-era rows |
| (anything written before C10 landed) and any test paths that |
| don't supply the kwarg keep ``null``. |
| """ |
| return { |
| "submission_id": submission_id, |
| "status": "pending", |
| "failure_reason": None, |
| "submitter_name": meta["submitter_name"], |
| "submission_name": meta["submission_name"], |
| "agent_url": meta["agent_url"], |
| "notes": meta["notes"], |
| "submitted_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), |
| "cadgenbench_version": cadgenbench.__version__, |
| "cadgenbench_data_revision": _resolve_data_revision(), |
| "validity_rate": None, |
| "aggregate_score": None, |
| "score_by_task_type": None, |
| "per_task_scores": None, |
| "per_fixture_scores": None, |
| "per_fixture_breakdown": None, |
| "submission_blob_url": blob_url, |
| "submission_sha256": submission_sha256, |
| "validation_status": "unvalidated", |
| "validation_method": None, |
| "hf_username": hf_username, |
| } |
|
|
|
|
| def _compute_sha256(path: Path) -> str: |
| """Hex-encoded SHA-256 of the file at *path*. Streams in 64 KiB chunks.""" |
| h = hashlib.sha256() |
| with path.open("rb") as f: |
| for chunk in iter(lambda: f.read(SHA256_BLOCK_SIZE), b""): |
| h.update(chunk) |
| return h.hexdigest() |
|
|
|
|
| def _find_existing_submission_by_sha256(zip_sha256: str) -> str | None: |
| """Return the ``submission_id`` of an existing row with the same hash, else None. |
| |
| Reads the current ``results.jsonl`` once (no lock; a worst-case |
| race lands a duplicate row, which is recoverable by a cleanup pass |
| if it ever happens). Hub-fetch failures are non-fatal: the caller |
| just doesn't get the dedup gate this submit (logged). |
| """ |
| try: |
| body = _download_results_jsonl() |
| except Exception as e: |
| logger.warning( |
| "Dedup check skipped, Hub fetch failed (%s: %s)", |
| type(e).__name__, e, |
| ) |
| return None |
| for line in body.splitlines(): |
| if not line.strip(): |
| continue |
| try: |
| row = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if row.get("submission_sha256") == zip_sha256: |
| return row.get("submission_id") |
| return None |
|
|
|
|
| def _append_pending_row(row: dict[str, Any]) -> None: |
| """Append a pending row to ``results.jsonl`` on the Hub under the lock.""" |
| submission_id = row["submission_id"] |
|
|
| def mutate(rows: list[dict[str, Any]]) -> None: |
| rows.append(row) |
|
|
| try: |
| _hub_rmw_results( |
| mutate, commit_message=f"add pending row for {submission_id}" |
| ) |
| except Exception as e: |
| logger.exception( |
| "Failed RMW of results.jsonl while appending pending row for %s", |
| submission_id, |
| ) |
| raise _HubWriteError( |
| f"Server-side error writing the submissions table " |
| f"({type(e).__name__}: {e}). The submission zip was uploaded " |
| f"but the row was not; please try again later." |
| ) from e |
|
|
|
|
| def _update_row(submission_id: str, updates: dict[str, Any]) -> None: |
| """Find the row for *submission_id* and merge *updates* into it. |
| |
| Raises ``LookupError`` if no row with that id exists (worker fired |
| before the pending row was committed, which shouldn't happen, but |
| surfaces clearly if it ever does). |
| """ |
| def mutate(rows: list[dict[str, Any]]) -> None: |
| for r in rows: |
| if r.get("submission_id") == submission_id: |
| r.update(updates) |
| return |
| raise LookupError( |
| f"No row with submission_id={submission_id!r} in results.jsonl." |
| ) |
|
|
| _hub_rmw_results( |
| mutate, |
| commit_message=( |
| f"flip row for {submission_id} -> {updates.get('status', '?')}" |
| ), |
| ) |
|
|
|
|
| def _hub_rmw_results( |
| mutate, *, commit_message: str, |
| ) -> None: |
| """Lock + download + mutate + upload of ``results.jsonl``. |
| |
| The lock is held only for the read-modify-write cycle (~1-2s), |
| never for eval time. Concurrent submitters serialise here, not |
| in the eval pipeline. Treats a missing file as the empty list. |
| """ |
| with _HUB_LOCK: |
| existing = _download_results_jsonl() |
| rows: list[dict[str, Any]] = [ |
| json.loads(line) for line in existing.splitlines() if line.strip() |
| ] |
| mutate(rows) |
| new_body = ( |
| "\n".join(json.dumps(r, ensure_ascii=False) for r in rows) + "\n" |
| if rows |
| else "" |
| ) |
| _HF_API.upload_file( |
| path_or_fileobj=new_body.encode("utf-8"), |
| path_in_repo=RESULTS_FILENAME, |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| commit_message=commit_message, |
| ) |
|
|
|
|
| def _download_results_jsonl() -> str: |
| """Fetch the current ``results.jsonl`` body as text, or ``""`` if absent.""" |
| from huggingface_hub import hf_hub_download |
|
|
| try: |
| path = hf_hub_download( |
| repo_id=HF_SUBMISSIONS_REPO, |
| filename=RESULTS_FILENAME, |
| repo_type="dataset", |
| force_download=True, |
| ) |
| except EntryNotFoundError: |
| return "" |
| return Path(path).read_text(encoding="utf-8") |
|
|
|
|
| def _resolve_data_revision() -> str: |
| """Return a short sha for the cadgenbench-data dataset, cached per process. |
| |
| Falls back to ``"unknown"`` on Hub errors so a flaky network can't |
| block a submission over a metadata field. |
| """ |
| global _DATA_REVISION |
| if _DATA_REVISION is not None: |
| return _DATA_REVISION |
| try: |
| info = _HF_API.dataset_info(HF_DATA_REPO) |
| _DATA_REVISION = (info.sha or "unknown")[:DATA_REV_SHORT_LEN] |
| except Exception as e: |
| logger.warning( |
| "Failed to resolve cadgenbench-data revision (%s: %s)", |
| type(e).__name__, e, |
| ) |
| _DATA_REVISION = "unknown" |
| return _DATA_REVISION |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _spawn_worker(submission_id: str, submission_blob_url: str) -> None: |
| """Start the dispatch+poll worker thread. |
| |
| Fire-and-forget; daemon=True so a Space restart doesn't block on |
| in-flight workers (the boot-time sweep below flips any rows their |
| workers didn't finish to failed). The worker no longer owns any |
| Space-local files; the Job downloads the zip itself from the Hub. |
| """ |
| t = threading.Thread( |
| target=_run_worker, |
| args=(submission_id, submission_blob_url), |
| name=f"cgb-worker-{submission_id}", |
| daemon=True, |
| ) |
| t.start() |
|
|
|
|
| def _run_worker(submission_id: str, submission_blob_url: str) -> None: |
| """Dispatch the eval Job, poll to completion, flip the row. |
| |
| Any exception (dispatch, poll, fetch_run_summary, flip) maps to a |
| ``failed`` row with a short ``failure_reason`` (full traceback goes |
| to the Space's runtime logs). |
| """ |
| try: |
| job_id = _dispatch_eval_job(submission_id, submission_blob_url) |
| logger.info("Dispatched eval job %s for %s", job_id, submission_id) |
| stage, status_message = _poll_until_done(job_id, submission_id) |
| if stage == "COMPLETED": |
| summary = _fetch_run_summary_from_report(submission_id) |
| _flip_row_to_completed(submission_id, summary) |
| logger.info("Worker completed for %s", submission_id) |
| return |
| reason = _job_failure_reason(job_id, stage, status_message) |
| _flip_row_to_failed(submission_id, reason) |
| logger.warning( |
| "Eval job %s for %s ended %s: %s", |
| job_id, submission_id, stage, reason, |
| ) |
| except Exception as e: |
| logger.exception("Worker failed for %s", submission_id) |
| reason = f"{type(e).__name__}: {str(e)}"[:FAILURE_REASON_MAX_CHARS] |
| try: |
| _flip_row_to_failed(submission_id, reason) |
| except Exception: |
| |
| |
| logger.exception( |
| "Failed to flip row to failed for %s; row stays pending", |
| submission_id, |
| ) |
|
|
|
|
| def _dispatch_eval_job( |
| submission_id: str, submission_blob_url: str, |
| ) -> str: |
| """Dispatch the per-submission eval Job and return its id. |
| |
| Passes through every env var ``eval_job.py`` needs to resolve the |
| Hub data + GT repos and the target submissions repo; the Job's |
| HF_TOKEN secret comes from the Space's own HF_TOKEN env (which |
| needs Jobs + repo R/W scopes, see space-setup/jobs-migration.md). |
| """ |
| token = os.environ.get("HF_TOKEN") |
| if not token: |
| raise RuntimeError( |
| "HF_TOKEN is unset on the Space; cannot dispatch eval job." |
| ) |
| env: dict[str, str] = { |
| "HF_SUBMISSIONS_REPO": HF_SUBMISSIONS_REPO, |
| "EVAL_WORKER_COUNT": EVAL_JOB_WORKER_COUNT, |
| } |
| for key in ("CADGENBENCH_DATA_REPO", "CADGENBENCH_DATA_GT_REPO"): |
| value = os.environ.get(key) |
| if value: |
| env[key] = value |
| job = run_job( |
| image=f"hf.co/spaces/{EVAL_GPU_SPACE}", |
| command=[ |
| "python", "/opt/eval_job.py", submission_id, submission_blob_url, |
| ], |
| flavor=EVAL_JOB_FLAVOR, |
| namespace=EVAL_JOB_NAMESPACE, |
| env=env, |
| secrets={"HF_TOKEN": token}, |
| timeout=EVAL_JOB_TIMEOUT, |
| token=token, |
| ) |
| return job.id |
|
|
|
|
| def _poll_until_done( |
| job_id: str, submission_id: str, |
| ) -> tuple[str, str | None]: |
| """Poll ``inspect_job`` until terminal; return (stage, message). |
| |
| Terminal stages: ``COMPLETED``, ``ERROR``. Anything else after the |
| outer deadline counts as a synthetic ``ERROR`` with a "deadline |
| exceeded" message; we do not try to cancel the Job from here (the |
| Job carries its own ``timeout`` and HF will reap it). Transient |
| ``inspect_job`` errors retry up to |
| ``JOB_POLL_MAX_CONSECUTIVE_ERRORS`` consecutive failures before |
| raising. |
| """ |
| deadline = time.monotonic() + JOB_POLL_DEADLINE_SECONDS |
| consecutive_errors = 0 |
| while True: |
| try: |
| info = inspect_job(job_id=job_id) |
| consecutive_errors = 0 |
| except Exception as e: |
| consecutive_errors += 1 |
| logger.warning( |
| "inspect_job(%s) failed (%d/%d): %s", |
| job_id, consecutive_errors, |
| JOB_POLL_MAX_CONSECUTIVE_ERRORS, e, |
| ) |
| if consecutive_errors >= JOB_POLL_MAX_CONSECUTIVE_ERRORS: |
| raise |
| time.sleep(JOB_POLL_INTERVAL_SECONDS) |
| continue |
|
|
| stage = info.status.stage |
| message = info.status.message |
| if stage in ("COMPLETED", "ERROR"): |
| return stage, message |
| if time.monotonic() >= deadline: |
| return "ERROR", ( |
| f"Space-side poll deadline exceeded " |
| f"({JOB_POLL_DEADLINE_SECONDS}s); last stage={stage}" |
| ) |
| time.sleep(JOB_POLL_INTERVAL_SECONDS) |
|
|
|
|
| def _job_failure_reason( |
| job_id: str, stage: str, status_message: str | None, |
| ) -> str: |
| """Build a short ``failure_reason`` for a non-completed Job. |
| |
| Combines the job's own ``status.message`` (if any) with the last |
| ``JOB_LOG_TAIL_LINES`` of ``fetch_job_logs`` so the user sees |
| something actionable in the row. Log fetch is best-effort. |
| """ |
| parts: list[str] = [f"eval job {stage.lower()}"] |
| if status_message: |
| parts.append(status_message) |
| try: |
| tail = list(fetch_job_logs(job_id=job_id))[-JOB_LOG_TAIL_LINES:] |
| if tail: |
| parts.append("logs: " + " | ".join(tail)) |
| except Exception as e: |
| logger.warning("fetch_job_logs(%s) failed: %s", job_id, e) |
| return ": ".join(parts)[:FAILURE_REASON_MAX_CHARS] |
|
|
|
|
| def _fetch_run_summary_from_report(submission_id: str) -> dict[str, Any]: |
| """Download ``reports/<id>.json`` and return its ``run_summary`` dict. |
| |
| The Job uploaded the report bundle before exiting; by the time |
| ``inspect_job`` returns COMPLETED the file is on the Hub. Raises |
| if the report or the ``run_summary`` key is missing (which would |
| indicate an eval that ran-but-broke contract; we want loud |
| failure rather than a silently-empty row). |
| """ |
| path = hf_hub_download( |
| repo_id=HF_SUBMISSIONS_REPO, |
| filename=f"{REPORTS_DIR}/{submission_id}.json", |
| repo_type="dataset", |
| force_download=True, |
| ) |
| payload = json.loads(Path(path).read_text(encoding="utf-8")) |
| summary = payload.get("run_summary") |
| if not isinstance(summary, dict): |
| raise RuntimeError( |
| f"reports/{submission_id}.json missing or malformed " |
| f"`run_summary` block (got {type(summary).__name__})" |
| ) |
| return summary |
|
|
|
|
| def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None: |
| """Merge ``run_summary.json`` fields into the pending row.""" |
| updates: dict[str, Any] = { |
| "status": "completed", |
| "failure_reason": None, |
| "cadgenbench_data_revision": _resolve_data_revision(), |
| "aggregate_score": summary.get("aggregate_score"), |
| "validity_rate": summary.get("validity_rate"), |
| "score_by_task_type": summary.get("score_by_task_type"), |
| "per_task_scores": summary.get("per_task_scores"), |
| "per_fixture_scores": summary.get("per_fixture_scores"), |
| } |
| _update_row(submission_id, updates) |
|
|
|
|
| def _flip_row_to_failed(submission_id: str, reason: str) -> None: |
| """Mark the row as ``failed`` with a short reason; scores stay null.""" |
| _update_row( |
| submission_id, |
| {"status": "failed", "failure_reason": reason}, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _sweep_stuck_pending() -> None: |
| """Flip pending rows older than the threshold to failed. |
| |
| A ``pending`` row whose worker died (Space restart, OOM, crash) |
| has no one to flip it; without this sweep it stays pending in |
| the leaderboard forever. The check is "submitted_at older than |
| 30 min" - well above the real eval ceiling (~5 min on |
| cpu-upgrade), so any genuinely-still-running submission is safe. |
| Runs once per process at module-import time inside a daemon |
| thread so app boot doesn't block on the Hub read. |
| """ |
| try: |
| body = _download_results_jsonl() |
| except Exception as e: |
| logger.warning( |
| "Stuck-pending sweep skipped, Hub fetch failed (%s: %s)", |
| type(e).__name__, e, |
| ) |
| return |
|
|
| cutoff = datetime.now(timezone.utc) - timedelta( |
| seconds=STUCK_PENDING_THRESHOLD_SECONDS |
| ) |
| stuck_ids: list[str] = [] |
| for line in body.splitlines(): |
| if not line.strip(): |
| continue |
| try: |
| row = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if row.get("status") != "pending": |
| continue |
| sid = row.get("submission_id") |
| ts_str = row.get("submitted_at") |
| if not sid or not ts_str: |
| continue |
| try: |
| ts = datetime.strptime(ts_str, SUBMITTED_AT_FORMAT).replace( |
| tzinfo=timezone.utc |
| ) |
| except ValueError: |
| logger.warning( |
| "Skipping unparseable submitted_at %r on row %s", |
| ts_str, sid, |
| ) |
| continue |
| if ts < cutoff: |
| stuck_ids.append(sid) |
|
|
| if not stuck_ids: |
| logger.info("Stuck-pending sweep: nothing stale") |
| return |
|
|
| logger.warning( |
| "Stuck-pending sweep: flipping %d row(s) to failed: %s", |
| len(stuck_ids), stuck_ids, |
| ) |
| for sid in stuck_ids: |
| try: |
| _flip_row_to_failed(sid, STUCK_PENDING_REASON) |
| except Exception as e: |
| logger.exception( |
| "Stuck-pending flip failed for %s (%s: %s)", |
| sid, type(e).__name__, e, |
| ) |
|
|
|
|
| def _start_boot_sweep() -> None: |
| """Spawn the sweep on a daemon thread at module import. |
| |
| Setting ``CADGENBENCH_DISABLE_BOOT_SWEEP=1`` opts out (useful |
| for unit-test imports that don't want the Hub round-trip). |
| """ |
| if os.getenv(BOOT_SWEEP_ENV) == "1": |
| logger.info("Stuck-pending sweep disabled via %s", BOOT_SWEEP_ENV) |
| return |
| threading.Thread( |
| target=_sweep_stuck_pending, |
| name="cgb-boot-sweep", |
| daemon=True, |
| ).start() |
|
|
|
|
| _start_boot_sweep() |
|
|