| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """Submit-tab handler for the CADGenBench leaderboard Space. |
| |
| Step 6 (E) chunks 2 + 3 + 4 + 6 + Step 10 (jobs migration): cheap-sync |
| validation + pending-row write + zip upload + background dispatch + |
| poll of an HF Jobs GPU eval + boot-time stuck-pending sweep. The |
| handler validates the upload, uploads the zip to |
| ``submissions/<id>.zip``, appends a ``status: pending`` row to |
| ``results.jsonl`` (under a process-wide lock), spawns a daemon thread |
| that dispatches a per-submission HF Job against the eval-gpu |
| Docker Space image and polls |
| ``inspect_job`` until the job's stage is terminal. On COMPLETED the |
| worker downloads ``reports/<id>.json`` (the Job already uploaded |
| ``reports/<id>.{html,json}`` to the submissions dataset), reads |
| ``run_summary`` out of it, and flips the row ``pending -> completed``. |
| On ERROR (or any dispatch / poll exception) the row flips to ``failed`` |
| with a short ``failure_reason``. At module import a one-shot daemon |
| sweep flips any ``pending`` row whose ``submitted_at`` is older than |
| 30 min to ``failed`` with a "Space restart" reason, so rows stranded by |
| a deploy / OOM / crash / orphaned Job don't sit pending forever. |
| |
| Validation gates, in order: |
| |
| 1. Form-level: a file was attached. |
| 2. Zip safety: parseable as a zip, no absolute / parent-traversing |
| entry names, no symlinks. |
| 3. ``meta.json`` schema: required keys present, types sane, |
| ``agree_to_publish`` is literally ``true`` (the sole consent |
| gate; no separate UI checkbox), ``notes`` is non-empty when |
| present and within the per-submission cap. |
| 4. Fixture-set match: the set of folders inside the zip equals the |
| set of fixture directories in :func:`cadgenbench.common.paths.data_inputs_dir` |
| (no missing, no extras). |
| 5. STEP parseability: any present ``<fixture>/output.step`` loads as STEP |
| geometry. A missing ``output.step`` is allowed and scores zero via the |
| evaluator's ``status="missing"`` path. Per-fixture validity (watertight, |
| manifold, etc) is *not* checked here; this gate only rejects files that are |
| present but not actually STEP. |
| |
| Hub-write ordering (after validation passes): |
| |
| 1. Upload ``submissions/<id>.zip``. Unique path per submission, no |
| lock needed. |
| 2. Build pending row (metadata + null scores + ``submission_blob_url``). |
| 3. Acquire ``_HUB_LOCK``; download current ``results.jsonl`` (or |
| start empty); append the pending row; re-upload. |
| 4. Spawn worker thread (daemon, named after submission_id). The |
| worker owns the tempdir's lifecycle past this point. |
| |
| If step 1 fails the user sees a clean rejection. If step 3 fails the |
| zip is left orphaned in ``submissions/`` and the user sees a clean |
| rejection; an orphan-zip sweep is a future-chunk concern. |
| |
| Background worker, per submission: |
| |
| 1. ``huggingface_hub.run_job(...)`` dispatches an HF Job against |
| the ``cadgenbench-eval-gpu`` Space image on ``a10g-large``, |
| passing the submission_id + zip blob URL as command args and |
| ``HF_TOKEN`` as a secret. |
| 2. Poll ``inspect_job(job_id)`` every few seconds until the job's |
| stage is terminal (``COMPLETED`` or ``ERROR``). Outer deadline |
| guards against an unresponsive poll surface. |
| 3. On ``COMPLETED``: download ``reports/<id>.json`` from the |
| submissions dataset (the Job uploaded both |
| ``reports/<id>.{html,json}`` before exiting), read |
| ``run_summary`` out of the bundled payload, under ``_HUB_LOCK`` |
| flip the row to ``"completed"`` and merge the score fields. |
| 4. On ``ERROR`` (or any dispatch / poll exception), flip the row to |
| ``"failed"`` with a short ``failure_reason`` (the job's |
| ``status.message`` plus the last N lines of ``fetch_job_logs``). |
| """ |
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import logging |
| import os |
| import random |
| import re |
| import shutil |
| import tempfile |
| import threading |
| import time |
| import zipfile |
| from concurrent.futures import ThreadPoolExecutor |
| from datetime import datetime, timedelta, timezone |
| from pathlib import Path |
| from typing import Any |
|
|
| import cadgenbench |
| import gradio as gr |
| from cadgenbench.common.paths import data_inputs_dir |
| from cadgenbench.common.validity import parse_step |
| from huggingface_hub import ( |
| CommitOperationAdd, |
| HfApi, |
| fetch_job_logs, |
| hf_hub_download, |
| inspect_job, |
| run_job, |
| snapshot_download, |
| ) |
| from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError |
|
|
| import progress |
| from leaderboard import ( |
| HF_DATA_REPO, |
| HF_ENDPOINT, |
| HF_ORG, |
| HF_RENDER_BUCKET, |
| HF_SUBMISSIONS_REPO, |
| render_submission_base_url, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| NOTES_MAX_CHARS = 500 |
| REQUIRED_META_KEYS: tuple[str, ...] = ( |
| "submitter_name", |
| "submission_name", |
| "agent_url", |
| "notes", |
| "agree_to_publish", |
| ) |
| SUBMISSION_ID_SLUG_MAX = 40 |
| RESULTS_FILENAME = "results.jsonl" |
| SUBMISSIONS_DIR = "submissions" |
| REPORTS_DIR = "reports" |
| |
| |
| |
| GT_PROXY_BASE_URL = "/gt" |
| INPUT_PROXY_BASE_URL = "/task-input" |
| DATA_REV_SHORT_LEN = 12 |
| FAILURE_REASON_MAX_CHARS = 200 |
| SHA256_BLOCK_SIZE = 64 * 1024 |
|
|
| |
| |
| |
| |
| |
| |
| |
| _HUB_RETRY_STATUSES = frozenset({429, 500, 502, 503, 504}) |
| HUB_RETRY_MAX_SECONDS = 120 |
| HUB_RETRY_BASE_DELAY_SECONDS = 2.0 |
| HUB_RETRY_MAX_DELAY_SECONDS = 20.0 |
| STUCK_PENDING_THRESHOLD_SECONDS = 30 * 60 |
| SUBMITTED_AT_FORMAT = "%Y-%m-%dT%H:%M:%SZ" |
| STUCK_PENDING_REASON = "evaluation interrupted by Space restart" |
| BOOT_SWEEP_ENV = "CADGENBENCH_DISABLE_BOOT_SWEEP" |
|
|
| |
| |
| |
| |
| |
| |
| |
| EVAL_GPU_SPACE = os.getenv( |
| "CADGENBENCH_EVAL_GPU_SPACE", f"{HF_ORG}/cadgenbench-eval-gpu" |
| ) |
| EVAL_JOB_FLAVOR = "a10g-large" |
| EVAL_JOB_NAMESPACE = "michaelr27" |
| EVAL_JOB_TIMEOUT = "30m" |
| EVAL_JOB_WORKER_COUNT = "8" |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| PROGRESS_STREAM_POLL_SECONDS = 3 |
| PROGRESS_STREAM_DEADLINE_SECONDS = 45 * 60 |
|
|
| |
| |
| |
| |
| |
| JOB_POLL_INTERVAL_SECONDS = 5 |
| JOB_POLL_DEADLINE_SECONDS = 35 * 60 |
| JOB_LOG_TAIL_LINES = 30 |
| JOB_POLL_MAX_CONSECUTIVE_ERRORS = 5 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| SHARD_THRESHOLD = 12 |
| SHARD_CHUNK_SIZE = 12 |
| SHARDS_SUBDIR = "shards" |
| |
| |
| SHARD_BUCKET = os.getenv("CADGENBENCH_SHARD_BUCKET", "").strip() |
| SHARD_BUCKET_PREFIX = os.getenv( |
| "CADGENBENCH_SHARD_BUCKET_PREFIX", SUBMISSIONS_DIR, |
| ).strip("/") |
| |
| |
| |
| |
| SHARD_MAX_RETRIES = 1 |
| |
| |
| |
| |
| SHARD_POLL_DEADLINE_SECONDS = 45 * 60 |
|
|
| |
| |
| _HF_API = HfApi() |
|
|
| |
| |
| |
| _HUB_LOCK = threading.Lock() |
|
|
| |
| _DATA_REVISION: str | None = None |
|
|
|
|
| class _ValidationError(Exception): |
| """Internal sentinel that maps to a user-facing rejection message.""" |
|
|
|
|
| class _HubWriteError(Exception): |
| """Raised when a Hub upload fails after validation succeeded.""" |
|
|
|
|
| def _retry_after_seconds(error: HfHubHTTPError) -> float | None: |
| """Parse a ``Retry-After`` header (seconds form) off a Hub error, if any.""" |
| response = getattr(error, "response", None) |
| if response is None: |
| return None |
| raw = response.headers.get("Retry-After") |
| if not raw: |
| return None |
| try: |
| return float(raw) |
| except (TypeError, ValueError): |
| return None |
|
|
|
|
| def _shard_bucket_enabled() -> bool: |
| """Whether shard scratch should be staged through an HF bucket.""" |
| return bool(SHARD_BUCKET) |
|
|
|
|
| def _shard_bucket_id() -> str: |
| """Return the bucket id (``namespace/bucket-name``), prefix stripped.""" |
| source = SHARD_BUCKET |
| if source.startswith("hf://buckets/"): |
| source = source[len("hf://buckets/"):] |
| return source.rstrip("/") |
|
|
|
|
| def _shard_bucket_prefix_path(submission_id: str) -> str: |
| """Bucket-relative path holding one directory per shard for *submission_id*.""" |
| parts = [p for p in SHARD_BUCKET_PREFIX.split("/") if p] |
| return "/".join([*parts, submission_id, SHARDS_SUBDIR]) |
|
|
|
|
| def _shard_bucket_uri(submission_id: str) -> str: |
| """``hf://buckets/...`` URI of the shards tree for *submission_id*.""" |
| return ( |
| f"hf://buckets/{_shard_bucket_id()}/" |
| f"{_shard_bucket_prefix_path(submission_id)}" |
| ) |
|
|
|
|
| def _jobs_token() -> str | None: |
| """Token used for HF Jobs control-plane calls.""" |
| return os.environ.get("HF_TOKEN") |
|
|
|
|
| def _with_hub_retries(fn, *, what: str): |
| """Run *fn* (a Hub commit) retrying transient HTTP errors with backoff. |
| |
| Retries only the statuses in :data:`_HUB_RETRY_STATUSES` (rate |
| limits + transient 5xx); any other error (auth, validation, a |
| ``LookupError`` from a mutate closure) propagates immediately. |
| Backoff is exponential with jitter, clamped to |
| :data:`HUB_RETRY_MAX_DELAY_SECONDS`, never sleeps past the |
| :data:`HUB_RETRY_MAX_SECONDS` wall cap, and honors a server |
| ``Retry-After`` when present. *fn* must be idempotent across calls |
| -- every caller here re-reads the remote state inside *fn* before |
| committing, so a retried commit can't double-apply. |
| """ |
| deadline = time.monotonic() + HUB_RETRY_MAX_SECONDS |
| attempt = 0 |
| while True: |
| attempt += 1 |
| try: |
| return fn() |
| except HfHubHTTPError as e: |
| status = getattr(getattr(e, "response", None), "status_code", None) |
| now = time.monotonic() |
| if status not in _HUB_RETRY_STATUSES or now >= deadline: |
| raise |
| delay = min( |
| HUB_RETRY_MAX_DELAY_SECONDS, |
| HUB_RETRY_BASE_DELAY_SECONDS * (2 ** (attempt - 1)), |
| ) |
| delay = delay / 2 + random.uniform(0, delay / 2) |
| retry_after = _retry_after_seconds(e) |
| if retry_after is not None: |
| delay = max(delay, retry_after) |
| delay = min(delay, max(0.0, deadline - now)) |
| logger.warning( |
| "Hub %s got HTTP %s; retry %d in %.1fs (cap %ds)", |
| what, status, attempt, delay, HUB_RETRY_MAX_SECONDS, |
| ) |
| time.sleep(delay) |
|
|
|
|
| def _submit_status(state: str, message: str) -> str: |
| """Markdown for the persistent submit-status panel. |
| |
| The panel is the durable counterpart to the transient ``gr.Info`` / |
| ``gr.Error`` toasts: a submitter always sees the current stage and |
| any rejection reason without having to catch an ephemeral toast. |
| *state* picks the leading glyph (``working`` / ``queued`` / ``done`` |
| / ``error``). |
| """ |
| glyph = {"working": "⏳", "queued": "✅", "done": "🎉", "error": "❌"}.get( |
| state, "•" |
| ) |
| return f"{glyph} {message}" |
|
|
|
|
| |
| |
| |
| |
| _PROGRESS_PANEL_STATE = { |
| progress.QUEUED: "queued", |
| progress.RUNNING: "working", |
| progress.COMPLETED: "done", |
| progress.FAILED: "error", |
| } |
|
|
|
|
| def _running_message_for_stage(stage: str) -> str: |
| """Friendly note for a non-terminal HF Jobs stage. |
| |
| The Jobs API exposes a stage string per poll. We only care about |
| one distinction the submitter actually feels: actively *running* vs |
| still *waiting for a machine*. Treating any non-RUNNING, non-terminal |
| stage as "queued on the GPU" keeps the message robust to the exact |
| set of intermediate stage names the API may use. |
| """ |
| if stage == "RUNNING": |
| return "Evaluating your submission on a GPU…" |
| return "Evaluation queued on a GPU — waiting for a free machine…" |
|
|
|
|
| def _completed_progress_message(summary: dict[str, Any]) -> str: |
| """Terminal success note, surfacing the headline score when present.""" |
| score = summary.get("aggregate_score") |
| if isinstance(score, (int, float)): |
| return ( |
| f"Done — scored {float(score):.4f}. Your row is on the " |
| f"Unvalidated leaderboard." |
| ) |
| return "Done — your row is on the Unvalidated leaderboard." |
|
|
|
|
| def _failed_progress_message(reason: str | None) -> str: |
| """Terminal failure note, appending the short reason when there is one.""" |
| reason = (reason or "").strip() |
| if reason: |
| return f"Evaluation failed: {reason}" |
| return "Evaluation failed." |
|
|
|
|
| def handle_submit( |
| zip_file, |
| profile: gr.OAuthProfile | None, |
| ): |
| """Validate + queue a submission, streaming status to a panel. |
| |
| Requires the user to be logged in via ``gr.LoginButton`` so the |
| row's ``hf_username`` is the canonical HF identity (not a |
| free-text claim). The submit button in :mod:`app` is disabled |
| until login completes; this generator also rejects defensively if |
| it's called without a profile so a UI mishap can't write an |
| anonymous row. |
| |
| Generator: each ``yield`` is a Markdown string pushed to a |
| persistent status panel, the durable counterpart to the transient |
| toasts. Happy-path stages: validating -> uploading/queuing -> |
| queued. Every rejection (login-missing, form-level, validation |
| gate, dedup, Hub write) yields a final error panel **and** raises |
| ``gr.Error`` for a toast; the outer ``try/finally`` still runs to |
| clean up the temp dir. The Hub writes inside ride out transient |
| rate limits via :func:`_with_hub_retries`, so a busy submissions |
| repo delays rather than fails the submit. |
| """ |
| if profile is None: |
| msg = "Please log in via the HF button before submitting." |
| yield _submit_status("error", msg) |
| raise gr.Error(msg) |
| form_err = _validate_form(zip_file) |
| if form_err is not None: |
| yield _submit_status("error", form_err) |
| raise gr.Error(form_err) |
|
|
| yield _submit_status( |
| "working", |
| "Validating submission (unpacking the zip, checking the sample set " |
| "and STEP files)…", |
| ) |
|
|
| zip_path = Path(zip_file.name) |
|
|
| |
| |
| |
| |
| |
| tmp = Path(tempfile.mkdtemp(prefix="cadgenbench-submit-")) |
| run_dir = tmp / "run" |
| run_dir.mkdir() |
| try: |
| try: |
| _extract_zip(zip_path, run_dir) |
| meta = _load_and_validate_meta(run_dir) |
| fixture_names = _validate_fixture_set(run_dir) |
| _validate_steps_parseable(run_dir, fixture_names) |
| except _ValidationError as e: |
| msg = f"Submission rejected: {e}" |
| yield _submit_status("error", msg) |
| raise gr.Error(msg) |
|
|
| |
| |
| |
| zip_sha256 = _compute_sha256(zip_path) |
| existing_id = _find_existing_submission_by_sha256(zip_sha256) |
| if existing_id is not None: |
| msg = ( |
| f"This zip's contents are identical to an existing " |
| f"submission ({existing_id}). Resubmit only after changing " |
| f"at least one byte of the upload." |
| ) |
| yield _submit_status("error", msg) |
| raise gr.Error(msg) |
|
|
| submission_id = _mint_submission_id( |
| meta["submitter_name"], meta["submission_name"] |
| ) |
| yield _submit_status( |
| "working", |
| f"Uploading `{submission_id}` ({len(fixture_names)} samples) and " |
| f"queuing the evaluation… (this can take a moment, and retries " |
| f"automatically if the Hub is busy).", |
| ) |
| try: |
| blob_url = _upload_submission_zip(submission_id, zip_path) |
| row = _build_pending_row( |
| submission_id, meta, blob_url, zip_sha256, |
| hf_username=profile.username, |
| ) |
| _append_pending_row(row) |
| except _HubWriteError as e: |
| msg = f"Submission rejected: {e}" |
| yield _submit_status("error", msg) |
| raise gr.Error(msg) |
|
|
| |
| |
| progress.publish( |
| submission_id, |
| progress.QUEUED, |
| f"Queued ({len(fixture_names)} samples) — waiting for the " |
| f"evaluation to start…", |
| ) |
| _spawn_worker(submission_id, blob_url, sorted(fixture_names)) |
| yield _submit_status( |
| "queued", |
| f"Submission `{submission_id}` queued ({len(fixture_names)} " |
| f"samples). The eval runs on an HF Jobs GPU; your row appears on " |
| f"the **Unvalidated** leaderboard and flips to completed when the " |
| f"job finishes (typically 1–3 minutes). Live progress below.", |
| ) |
| |
| |
| |
| |
| yield from _stream_submission_progress(submission_id) |
| finally: |
| shutil.rmtree(tmp, ignore_errors=True) |
|
|
|
|
| def _stream_submission_progress(submission_id: str): |
| """Yield panel markdown as the worker advances *submission_id*. |
| |
| Polls the in-process :mod:`progress` registry every |
| :data:`PROGRESS_STREAM_POLL_SECONDS` and yields a fresh status panel |
| only when the human-readable note changes (so the panel updates on |
| real transitions, not every tick). Returns when the submission |
| reaches a terminal state, or yields a "still running in the |
| background" note and returns if the backstop deadline trips first |
| (worker death, an unusually long sharded run, etc.). |
| """ |
| deadline = time.monotonic() + PROGRESS_STREAM_DEADLINE_SECONDS |
| last_message: str | None = None |
| while True: |
| snap = progress.get(submission_id) |
| if snap is not None and snap.message != last_message: |
| last_message = snap.message |
| yield _submit_status( |
| _PROGRESS_PANEL_STATE.get(snap.state, "working"), snap.message, |
| ) |
| if snap is not None and progress.is_terminal(snap.state): |
| return |
| if time.monotonic() >= deadline: |
| yield _submit_status( |
| "queued", |
| "Evaluation is taking longer than expected; it continues in " |
| "the background. Check the **Unvalidated** leaderboard for " |
| "the final result.", |
| ) |
| return |
| time.sleep(PROGRESS_STREAM_POLL_SECONDS) |
|
|
|
|
| def _validate_form(zip_file) -> str | None: |
| """Form-level check before any zip parsing. |
| |
| Returns a plain-text rejection message (no markdown wrapping; |
| the caller wraps it into a ``gr.Error`` toast) or ``None`` when |
| the form is acceptable. |
| """ |
| if zip_file is None: |
| return "Please attach a submission zip." |
| return None |
|
|
|
|
| def _extract_zip(zip_path: Path, target: Path) -> None: |
| """Extract *zip_path* into *target* with zip-slip + symlink rejection. |
| |
| Python's ``ZipFile.extractall`` since 3.12 normalises away unsafe |
| paths silently; we'd rather reject the upload outright so the |
| submitter sees a clear error instead of getting a "fixture set |
| mismatch" downstream because half their files were dropped. |
| """ |
| try: |
| with zipfile.ZipFile(zip_path) as zf: |
| for info in zf.infolist(): |
| if info.is_dir(): |
| continue |
| name = Path(info.filename) |
| if name.is_absolute() or ".." in name.parts: |
| raise _ValidationError( |
| f"Zip contains an unsafe path: {info.filename!r}." |
| ) |
| |
| |
| mode = info.external_attr >> 16 |
| if mode and (mode & 0o170000) == 0o120000: |
| raise _ValidationError( |
| f"Zip contains a symlink ({info.filename!r}); " |
| f"submissions must be plain files." |
| ) |
| zf.extractall(target) |
| except zipfile.BadZipFile as e: |
| raise _ValidationError(f"Upload is not a valid zip file: {e}") from e |
|
|
|
|
| def _load_and_validate_meta(unpacked: Path) -> dict[str, Any]: |
| meta_path = unpacked / "meta.json" |
| if not meta_path.is_file(): |
| raise _ValidationError( |
| "Zip is missing top-level `meta.json` (expected at the root of " |
| "the zip, alongside the per-sample folders)." |
| ) |
| try: |
| meta = json.loads(meta_path.read_text()) |
| except json.JSONDecodeError as e: |
| raise _ValidationError( |
| f"`meta.json` is not valid JSON: {e.msg} (line {e.lineno})." |
| ) from e |
| if not isinstance(meta, dict): |
| raise _ValidationError( |
| "`meta.json` must be a JSON object at the top level." |
| ) |
|
|
| missing = [k for k in REQUIRED_META_KEYS if k not in meta] |
| if missing: |
| raise _ValidationError( |
| f"`meta.json` is missing required key(s): {', '.join(missing)}." |
| ) |
|
|
| for k in ("submitter_name", "submission_name"): |
| v = meta[k] |
| if not isinstance(v, str) or not v.strip(): |
| raise _ValidationError( |
| f"`meta.json` field `{k}` must be a non-empty string." |
| ) |
|
|
| for k in ("agent_url", "notes"): |
| v = meta[k] |
| if v is not None and not isinstance(v, str): |
| raise _ValidationError( |
| f"`meta.json` field `{k}` must be a string or null." |
| ) |
|
|
| if meta["agree_to_publish"] is not True: |
| raise _ValidationError( |
| "`meta.json` field `agree_to_publish` must be the literal boolean " |
| "`true`." |
| ) |
|
|
| if meta["notes"] is not None: |
| meta["notes"] = _normalize_notes(meta["notes"]) |
|
|
| return meta |
|
|
|
|
| def _normalize_notes(raw: str) -> str: |
| """Collapse newlines + tabs to spaces, strip, enforce the char cap.""" |
| one_line = re.sub(r"[\r\n\t]+", " ", raw).strip() |
| if len(one_line) > NOTES_MAX_CHARS: |
| raise _ValidationError( |
| f"`meta.json` field `notes` exceeds the {NOTES_MAX_CHARS}-char " |
| f"cap (got {len(one_line)} after stripping). Trim and resubmit." |
| ) |
| return one_line |
|
|
|
|
| def _validate_fixture_set(unpacked: Path) -> set[str]: |
| """Compare unpacked top-level dirs to the inputs dataset's fixture set.""" |
| actual = {p.name for p in unpacked.iterdir() if p.is_dir()} |
|
|
| try: |
| inputs_root = data_inputs_dir() |
| except Exception as e: |
| raise _ValidationError( |
| f"Server-side error resolving the sample set " |
| f"({type(e).__name__}: {e})." |
| ) from e |
| expected = {p.name for p in inputs_root.iterdir() if p.is_dir()} |
|
|
| missing = expected - actual |
| extras = actual - expected |
| if missing or extras: |
| parts: list[str] = [] |
| if missing: |
| parts.append(f"missing sample(s): {', '.join(sorted(missing))}") |
| if extras: |
| parts.append(f"unexpected folder(s): {', '.join(sorted(extras))}") |
| raise _ValidationError( |
| "Sample set does not match the dataset. " + "; ".join(parts) + "." |
| ) |
| return expected |
|
|
|
|
| def _validate_steps_parseable(unpacked: Path, fixture_names: set[str]) -> None: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def _check_one_step(name: str) -> None: |
| step = _candidate_step_path(unpacked / name) |
| if step is None: |
| |
| |
| return |
| if step.stat().st_size == 0: |
| raise _ValidationError( |
| f"Sample `{name}` has an empty `{step.name}`." |
| ) |
| try: |
| parse_step(step) |
| except RuntimeError as e: |
| raise _ValidationError( |
| f"Sample `{name}` has an `{step.name}` that is not loadable " |
| f"as STEP geometry: {e}" |
| ) from e |
|
|
| with ThreadPoolExecutor( |
| max_workers=min(8, os.cpu_count() or 1), |
| ) as ex: |
| list(ex.map(_check_one_step, sorted(fixture_names))) |
|
|
|
|
| def _candidate_step_path(fixture_dir: Path) -> Path | None: |
| """Return the submitted candidate STEP for *fixture_dir*, if present.""" |
| for name in ("output.step", "output.stp"): |
| step = fixture_dir / name |
| if step.is_file(): |
| return step |
| return None |
|
|
|
|
| def _mint_submission_id(submitter_name: str, submission_name: str) -> str: |
| """Build the basename used for ``submissions/<id>.zip`` and ``reports/<id>.*``.""" |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") |
| return f"{_slug(submitter_name)}_{_slug(submission_name)}_{ts}" |
|
|
|
|
| def _slug(s: str) -> str: |
| """Filesystem-safe slug. Lowercase, ``[a-z0-9-]``, collapsed dashes.""" |
| cleaned = re.sub(r"[^A-Za-z0-9]+", "-", s).strip("-").lower() |
| return cleaned[:SUBMISSION_ID_SLUG_MAX] or "unnamed" |
|
|
|
|
| def _submission_zip_url(submission_id: str) -> str: |
| """Hub resolve URL of ``submissions/<id>.zip`` (the report's download link). |
| |
| Same canonical blob URL :func:`_upload_submission_zip` returns and the |
| gallery links, so the report's "Download submission ZIP" button points at |
| the identical artifact. |
| """ |
| return ( |
| f"{HF_ENDPOINT}/datasets/{HF_SUBMISSIONS_REPO}" |
| f"/resolve/main/{SUBMISSIONS_DIR}/{submission_id}.zip" |
| ) |
|
|
|
|
| def _upload_submission_zip(submission_id: str, zip_path: Path) -> str: |
| """Upload the submission zip to ``submissions/<id>.zip``. |
| |
| Returns the canonical Hub blob URL on success. Raises |
| :class:`_HubWriteError` with a short user-facing reason on |
| failure. |
| """ |
| repo_path = f"{SUBMISSIONS_DIR}/{submission_id}.zip" |
| try: |
| _with_hub_retries( |
| lambda: _HF_API.upload_file( |
| path_or_fileobj=str(zip_path), |
| path_in_repo=repo_path, |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| commit_message=f"add submission zip for {submission_id}", |
| ), |
| what="submission-zip upload", |
| ) |
| except Exception as e: |
| logger.exception("Failed to upload submission zip %s", submission_id) |
| raise _HubWriteError( |
| f"Server-side error uploading submission zip " |
| f"({type(e).__name__}: {e}). Please try again later." |
| ) from e |
| return ( |
| f"https://huggingface.co/datasets/{HF_SUBMISSIONS_REPO}" |
| f"/resolve/main/{repo_path}" |
| ) |
|
|
|
|
| def _build_pending_row( |
| submission_id: str, |
| meta: dict[str, Any], |
| blob_url: str, |
| submission_sha256: str, |
| hf_username: str | None = None, |
| ) -> dict[str, Any]: |
| """Construct the JSON row written for a freshly-queued submission. |
| |
| Mirrors the pending regime in ``cadgenbench-submissions/schema.md``: |
| metadata + ``status: pending`` + ``submission_blob_url`` + |
| ``submission_sha256``; every score-shaped field is ``null`` until |
| the worker flips the row. |
| |
| Validation-tier fields default per the validation-policy decision |
| doc: ``validation_status: "unvalidated"`` (maintainers promote |
| post-eval), ``validation_method: None``. ``hf_username`` defaults |
| to ``None``; callers post-OAuth pass ``profile.username`` so the |
| row carries the canonical HF identity. Pre-OAuth-era rows |
| (anything written before C10 landed) and any test paths that |
| don't supply the kwarg keep ``null``. |
| """ |
| return { |
| "submission_id": submission_id, |
| "status": "pending", |
| "failure_reason": None, |
| "submitter_name": meta["submitter_name"], |
| "submission_name": meta["submission_name"], |
| "agent_url": meta["agent_url"], |
| "notes": meta["notes"], |
| "submitted_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), |
| "cadgenbench_version": cadgenbench.__version__, |
| "cadgenbench_data_revision": _resolve_data_revision(), |
| "validity_rate": None, |
| "aggregate_score": None, |
| "score_by_task_type": None, |
| "per_task_scores": None, |
| "per_fixture_scores": None, |
| "per_fixture_breakdown": None, |
| "submission_blob_url": blob_url, |
| "submission_sha256": submission_sha256, |
| "validation_status": "unvalidated", |
| "validation_method": None, |
| "hf_username": hf_username, |
| } |
|
|
|
|
| def _compute_sha256(path: Path) -> str: |
| """Hex-encoded SHA-256 of the file at *path*. Streams in 64 KiB chunks.""" |
| h = hashlib.sha256() |
| with path.open("rb") as f: |
| for chunk in iter(lambda: f.read(SHA256_BLOCK_SIZE), b""): |
| h.update(chunk) |
| return h.hexdigest() |
|
|
|
|
| def _find_existing_submission_by_sha256(zip_sha256: str) -> str | None: |
| """Return the ``submission_id`` of an existing row with the same hash, else None. |
| |
| Reads the current ``results.jsonl`` once (no lock; a worst-case |
| race lands a duplicate row, which is recoverable by a cleanup pass |
| if it ever happens). Hub-fetch failures are non-fatal: the caller |
| just doesn't get the dedup gate this submit (logged). |
| """ |
| try: |
| body = _download_results_jsonl() |
| except Exception as e: |
| logger.warning( |
| "Dedup check skipped, Hub fetch failed (%s: %s)", |
| type(e).__name__, e, |
| ) |
| return None |
| for line in body.splitlines(): |
| if not line.strip(): |
| continue |
| try: |
| row = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if row.get("submission_sha256") == zip_sha256: |
| return row.get("submission_id") |
| return None |
|
|
|
|
| def _append_pending_row(row: dict[str, Any]) -> None: |
| """Append a pending row to ``results.jsonl`` on the Hub under the lock.""" |
| submission_id = row["submission_id"] |
|
|
| def mutate(rows: list[dict[str, Any]]) -> None: |
| rows.append(row) |
|
|
| try: |
| _hub_rmw_results( |
| mutate, commit_message=f"add pending row for {submission_id}" |
| ) |
| except Exception as e: |
| logger.exception( |
| "Failed RMW of results.jsonl while appending pending row for %s", |
| submission_id, |
| ) |
| raise _HubWriteError( |
| f"Server-side error writing the submissions table " |
| f"({type(e).__name__}: {e}). The submission zip was uploaded " |
| f"but the row was not; please try again later." |
| ) from e |
|
|
|
|
| def _update_row(submission_id: str, updates: dict[str, Any]) -> None: |
| """Find the row for *submission_id* and merge *updates* into it. |
| |
| Raises ``LookupError`` if no row with that id exists (worker fired |
| before the pending row was committed, which shouldn't happen, but |
| surfaces clearly if it ever does). |
| """ |
| def mutate(rows: list[dict[str, Any]]) -> None: |
| for r in rows: |
| if r.get("submission_id") == submission_id: |
| r.update(updates) |
| return |
| raise LookupError( |
| f"No row with submission_id={submission_id!r} in results.jsonl." |
| ) |
|
|
| _hub_rmw_results( |
| mutate, |
| commit_message=( |
| f"flip row for {submission_id} -> {updates.get('status', '?')}" |
| ), |
| ) |
|
|
|
|
| def _hub_rmw_results( |
| mutate, *, commit_message: str, |
| ) -> None: |
| """Lock + download + mutate + upload of ``results.jsonl``. |
| |
| The lock is held only for the read-modify-write cycle (~1-2s), |
| never for eval time. Concurrent submitters serialise here, not |
| in the eval pipeline. Treats a missing file as the empty list. |
| |
| The whole download->mutate->upload cycle is retried as a unit on a |
| transient Hub error (:func:`_with_hub_retries`): re-reading the |
| current file each attempt keeps the mutation idempotent and also |
| folds in any concurrent change, so a rate-limited commit waits and |
| retries instead of failing the caller. |
| """ |
| def _download_mutate_upload() -> None: |
| existing = _download_results_jsonl() |
| rows: list[dict[str, Any]] = [ |
| json.loads(line) for line in existing.splitlines() if line.strip() |
| ] |
| mutate(rows) |
| new_body = ( |
| "\n".join(json.dumps(r, ensure_ascii=False) for r in rows) + "\n" |
| if rows |
| else "" |
| ) |
| _HF_API.upload_file( |
| path_or_fileobj=new_body.encode("utf-8"), |
| path_in_repo=RESULTS_FILENAME, |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| commit_message=commit_message, |
| ) |
|
|
| with _HUB_LOCK: |
| _with_hub_retries(_download_mutate_upload, what="results.jsonl write") |
|
|
|
|
| def _download_results_jsonl() -> str: |
| """Fetch the current ``results.jsonl`` body as text, or ``""`` if absent.""" |
| from huggingface_hub import hf_hub_download |
|
|
| try: |
| path = hf_hub_download( |
| repo_id=HF_SUBMISSIONS_REPO, |
| filename=RESULTS_FILENAME, |
| repo_type="dataset", |
| force_download=True, |
| ) |
| except EntryNotFoundError: |
| return "" |
| return Path(path).read_text(encoding="utf-8") |
|
|
|
|
| def _resolve_data_revision() -> str: |
| """Return a short sha for the cadgenbench-data dataset, cached per process. |
| |
| Falls back to ``"unknown"`` on Hub errors so a flaky network can't |
| block a submission over a metadata field. |
| """ |
| global _DATA_REVISION |
| if _DATA_REVISION is not None: |
| return _DATA_REVISION |
| try: |
| info = _HF_API.dataset_info(HF_DATA_REPO) |
| _DATA_REVISION = (info.sha or "unknown")[:DATA_REV_SHORT_LEN] |
| except Exception as e: |
| logger.warning( |
| "Failed to resolve cadgenbench-data revision (%s: %s)", |
| type(e).__name__, e, |
| ) |
| _DATA_REVISION = "unknown" |
| return _DATA_REVISION |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _spawn_worker( |
| submission_id: str, |
| submission_blob_url: str, |
| fixture_names: list[str], |
| ) -> None: |
| """Start the dispatch+poll worker thread. |
| |
| Fire-and-forget; daemon=True so a Space restart doesn't block on |
| in-flight workers (the boot-time sweep below flips any rows their |
| workers didn't finish to failed). The worker no longer owns any |
| Space-local files; the Job(s) download the zip themselves from the |
| Hub. *fixture_names* (the validated, dataset-matched set) decides |
| single-job vs. sharded dispatch and drives the shard split. |
| """ |
| t = threading.Thread( |
| target=_run_worker, |
| args=(submission_id, submission_blob_url, fixture_names), |
| name=f"cgb-worker-{submission_id}", |
| daemon=True, |
| ) |
| t.start() |
|
|
|
|
| def _run_worker( |
| submission_id: str, |
| submission_blob_url: str, |
| fixture_names: list[str], |
| ) -> None: |
| """Dispatch the eval Job(s), poll to completion, flip the row. |
| |
| Submissions at/under :data:`SHARD_THRESHOLD` fixtures run as a |
| single job (the original path); larger ones fan out across shards |
| and merge. Any exception (dispatch, poll, fetch/merge, flip) maps to |
| a ``failed`` row with a short ``failure_reason`` (full traceback |
| goes to the Space's runtime logs). |
| """ |
| try: |
| if len(fixture_names) > SHARD_THRESHOLD: |
| _run_worker_sharded( |
| submission_id, submission_blob_url, fixture_names, |
| ) |
| return |
| progress.publish( |
| submission_id, |
| progress.RUNNING, |
| "Evaluation dispatched — waiting for a GPU…", |
| ) |
| job_id = _dispatch_eval_job(submission_id, submission_blob_url) |
| logger.info("Dispatched eval job %s for %s", job_id, submission_id) |
| stage, status_message = _poll_until_done(job_id, submission_id) |
| if stage == "COMPLETED": |
| progress.publish( |
| submission_id, |
| progress.RUNNING, |
| "Evaluation finished — collecting results…", |
| ) |
| summary = _fetch_run_summary_from_report(submission_id) |
| _flip_row_to_completed(submission_id, summary) |
| progress.publish( |
| submission_id, |
| progress.COMPLETED, |
| _completed_progress_message(summary), |
| ) |
| logger.info("Worker completed for %s", submission_id) |
| return |
| reason = _job_failure_reason(job_id, stage, status_message) |
| _flip_row_to_failed(submission_id, reason) |
| progress.publish( |
| submission_id, progress.FAILED, _failed_progress_message(reason), |
| ) |
| logger.warning( |
| "Eval job %s for %s ended %s: %s", |
| job_id, submission_id, stage, reason, |
| ) |
| except Exception as e: |
| logger.exception("Worker failed for %s", submission_id) |
| reason = f"{type(e).__name__}: {str(e)}"[:FAILURE_REASON_MAX_CHARS] |
| progress.publish( |
| submission_id, progress.FAILED, _failed_progress_message(reason), |
| ) |
| try: |
| _flip_row_to_failed(submission_id, reason) |
| except Exception: |
| |
| |
| logger.exception( |
| "Failed to flip row to failed for %s; row stays pending", |
| submission_id, |
| ) |
|
|
|
|
| def _run_worker_sharded( |
| submission_id: str, |
| submission_blob_url: str, |
| fixture_names: list[str], |
| ) -> None: |
| """Fan a large submission across shard jobs, then merge + flip. |
| |
| Dispatches every shard at once (HF queues overflow past the |
| account's concurrent-job cap), polls all to terminal retrying only |
| ERROR shards, then merges each shard's per-fixture dirs into one run |
| dir, recomputes the aggregate ``run_summary`` + report + gallery, |
| flips the row to ``completed``, and deletes the shards tree. If any |
| shard is still ERROR after its retries the row flips to ``failed`` |
| and the partial shard artifacts are left for a maintainer to |
| inspect. Raised exceptions propagate to :func:`_run_worker`'s |
| handler, which maps them to a failed row. |
| """ |
| chunks = _chunk_fixtures(fixture_names, SHARD_CHUNK_SIZE) |
| shards: dict[str, dict[str, Any]] = { |
| f"shard_{i:03d}": { |
| "fixtures": chunk, |
| "job_id": None, |
| "attempts": 0, |
| "stage": None, |
| "message": None, |
| } |
| for i, chunk in enumerate(chunks) |
| } |
| logger.info( |
| "Sharded eval for %s: %d fixtures -> %d shard(s)", |
| submission_id, len(fixture_names), len(shards), |
| ) |
| progress.publish( |
| submission_id, |
| progress.RUNNING, |
| f"Evaluation split into {len(shards)} chunks — dispatching to GPUs…", |
| ) |
| for shard_id, st in shards.items(): |
| _dispatch_shard(submission_id, submission_blob_url, shard_id, st) |
|
|
| failures = _poll_shards_until_done( |
| submission_id, submission_blob_url, shards, |
| ) |
| if failures: |
| reason = ("sharded eval failed: " + "; ".join(failures))[ |
| :FAILURE_REASON_MAX_CHARS |
| ] |
| _flip_row_to_failed(submission_id, reason) |
| progress.publish( |
| submission_id, progress.FAILED, _failed_progress_message(reason), |
| ) |
| logger.warning("Sharded eval for %s failed: %s", submission_id, reason) |
| return |
|
|
| progress.publish( |
| submission_id, |
| progress.RUNNING, |
| "All chunks evaluated — merging results…", |
| ) |
| summary = _merge_shards_and_publish( |
| submission_id, list(shards.keys()), fixture_names, |
| ) |
| _flip_row_to_completed(submission_id, summary) |
| progress.publish( |
| submission_id, progress.COMPLETED, _completed_progress_message(summary), |
| ) |
| logger.info("Sharded worker completed for %s", submission_id) |
| _cleanup_shard_artifacts(submission_id) |
|
|
|
|
| def _chunk_fixtures(fixtures: list[str], chunk_size: int) -> list[list[str]]: |
| """Split *fixtures* into contiguous chunks of at most *chunk_size*.""" |
| return [ |
| fixtures[i:i + chunk_size] |
| for i in range(0, len(fixtures), chunk_size) |
| ] |
|
|
|
|
| def _dispatch_eval_job( |
| submission_id: str, submission_blob_url: str, |
| ) -> str: |
| """Dispatch the whole-submission eval Job and return its id.""" |
| return _dispatch_eval_command(submission_id, submission_blob_url, []) |
|
|
|
|
| def _dispatch_eval_command( |
| submission_id: str, |
| submission_blob_url: str, |
| extra_args: list[str], |
| ) -> str: |
| """Dispatch an eval Job (whole-submission or one shard) and return its id. |
| |
| Passes through every env var ``eval_job.py`` needs to resolve the |
| Hub data + GT repos and the target submissions repo; the Job's |
| HF_TOKEN secret comes from the Space's own HF_TOKEN env (which |
| needs Jobs + repo R/W scopes, see space-setup/jobs-migration.md). |
| *extra_args* are appended to the entrypoint argv; empty for the |
| whole-submission path, ``--shard-id ... --fixtures ...`` for a shard. |
| """ |
| token = os.environ.get("HF_TOKEN") |
| if not token: |
| raise RuntimeError( |
| "HF_TOKEN is unset on the Space; cannot dispatch eval job." |
| ) |
| env: dict[str, str] = { |
| "HF_SUBMISSIONS_REPO": HF_SUBMISSIONS_REPO, |
| "EVAL_WORKER_COUNT": EVAL_JOB_WORKER_COUNT, |
| |
| "CADGENBENCH_RENDER_BUCKET": HF_RENDER_BUCKET, |
| "HF_ENDPOINT": HF_ENDPOINT, |
| } |
| for key in ("CADGENBENCH_DATA_REPO", "CADGENBENCH_DATA_GT_REPO"): |
| value = os.environ.get(key) |
| if value: |
| env[key] = value |
| if _shard_bucket_enabled() and "--shard-id" in extra_args: |
| |
| |
| env.update( |
| { |
| "CADGENBENCH_SHARD_BUCKET": _shard_bucket_id(), |
| "CADGENBENCH_SHARD_BUCKET_PREFIX": SHARD_BUCKET_PREFIX, |
| } |
| ) |
| job = run_job( |
| image=f"hf.co/spaces/{EVAL_GPU_SPACE}", |
| command=[ |
| "python", "/opt/eval_job.py", submission_id, submission_blob_url, |
| *extra_args, |
| ], |
| flavor=EVAL_JOB_FLAVOR, |
| namespace=EVAL_JOB_NAMESPACE, |
| env=env, |
| secrets={"HF_TOKEN": token}, |
| timeout=EVAL_JOB_TIMEOUT, |
| token=token, |
| ) |
| return job.id |
|
|
|
|
| def _dispatch_shard( |
| submission_id: str, |
| submission_blob_url: str, |
| shard_id: str, |
| state: dict[str, Any], |
| ) -> None: |
| """Dispatch (or re-dispatch) one shard job and record it in *state*. |
| |
| Mutates *state* in place: sets ``job_id``, bumps ``attempts``, and |
| clears the prior ``stage``/``message`` so a retried shard is polled |
| fresh. The shard re-evals its own fixture slice and overwrites its |
| configured shard-staging prefix, so a retry is idempotent. |
| """ |
| job_id = _dispatch_eval_command( |
| submission_id, |
| submission_blob_url, |
| ["--shard-id", shard_id, "--fixtures", ",".join(state["fixtures"])], |
| ) |
| state["job_id"] = job_id |
| state["attempts"] += 1 |
| state["stage"] = None |
| state["message"] = None |
| logger.info( |
| "Dispatched shard %s for %s (attempt %d, job %s, %d fixtures)", |
| shard_id, submission_id, state["attempts"], job_id, |
| len(state["fixtures"]), |
| ) |
|
|
|
|
| def _poll_shards_until_done( |
| submission_id: str, |
| submission_blob_url: str, |
| shards: dict[str, dict[str, Any]], |
| ) -> list[str]: |
| """Poll every shard to terminal, retrying only ERROR shards. |
| |
| Mirrors the orchestrator's eval poll loop: a single thread sweeps |
| all running shards each tick (``inspect_job`` calls are cheap), an |
| ERROR shard re-dispatches up to :data:`SHARD_MAX_RETRIES` times, |
| and a non-terminal stage just waits. Returns a list of |
| ``"<shard_id>: <reason>"`` strings for shards that stayed ERROR |
| after their retries (empty list means every shard COMPLETED). |
| Transient ``inspect_job`` failures retry up to |
| :data:`JOB_POLL_MAX_CONSECUTIVE_ERRORS` before raising. |
| """ |
| deadline = time.monotonic() + SHARD_POLL_DEADLINE_SECONDS |
| consecutive_errors = 0 |
| last_done = -1 |
| total = len(shards) |
| while True: |
| running = [ |
| sid for sid, st in shards.items() |
| if st["stage"] not in ("COMPLETED", "FAILED") |
| ] |
| |
| |
| done = sum(1 for st in shards.values() if st["stage"] == "COMPLETED") |
| if done != last_done: |
| last_done = done |
| progress.publish( |
| submission_id, |
| progress.RUNNING, |
| f"Evaluating… {done} of {total} chunks done.", |
| ) |
| if not running: |
| break |
| for shard_id in running: |
| st = shards[shard_id] |
| try: |
| info = inspect_job( |
| job_id=st["job_id"], |
| namespace=EVAL_JOB_NAMESPACE, |
| token=_jobs_token(), |
| ) |
| consecutive_errors = 0 |
| except Exception as e: |
| consecutive_errors += 1 |
| logger.warning( |
| "inspect_job(%s) for shard %s failed (%d/%d): %s", |
| st["job_id"], shard_id, consecutive_errors, |
| JOB_POLL_MAX_CONSECUTIVE_ERRORS, e, |
| ) |
| if consecutive_errors >= JOB_POLL_MAX_CONSECUTIVE_ERRORS: |
| raise |
| break |
|
|
| stage = info.status.stage |
| if stage == "COMPLETED": |
| st["stage"] = "COMPLETED" |
| logger.info("Shard %s COMPLETED for %s", shard_id, submission_id) |
| elif stage == "ERROR": |
| if st["attempts"] <= SHARD_MAX_RETRIES: |
| logger.warning( |
| "Shard %s ERROR; retry %d/%d", |
| shard_id, st["attempts"], SHARD_MAX_RETRIES, |
| ) |
| _dispatch_shard( |
| submission_id, submission_blob_url, shard_id, st, |
| ) |
| else: |
| st["stage"] = "FAILED" |
| st["message"] = _job_failure_reason( |
| st["job_id"], stage, info.status.message, |
| ) |
| logger.warning( |
| "Shard %s FAILED after %d attempt(s): %s", |
| shard_id, st["attempts"], st["message"], |
| ) |
|
|
| if time.monotonic() >= deadline: |
| for shard_id, st in shards.items(): |
| if st["stage"] not in ("COMPLETED", "FAILED"): |
| st["stage"] = "FAILED" |
| st["message"] = ( |
| f"Space-side poll deadline exceeded " |
| f"({SHARD_POLL_DEADLINE_SECONDS}s)" |
| ) |
| break |
| time.sleep(JOB_POLL_INTERVAL_SECONDS) |
|
|
| return [ |
| f"{sid}: {st['message']}" |
| for sid, st in shards.items() |
| if st["stage"] == "FAILED" |
| ] |
|
|
|
|
| def _poll_until_done( |
| job_id: str, submission_id: str, |
| ) -> tuple[str, str | None]: |
| """Poll ``inspect_job`` until terminal; return (stage, message). |
| |
| Terminal stages: ``COMPLETED``, ``ERROR``. Anything else after the |
| outer deadline counts as a synthetic ``ERROR`` with a "deadline |
| exceeded" message; we do not try to cancel the Job from here (the |
| Job carries its own ``timeout`` and HF will reap it). Transient |
| ``inspect_job`` errors retry up to |
| ``JOB_POLL_MAX_CONSECUTIVE_ERRORS`` consecutive failures before |
| raising. |
| """ |
| deadline = time.monotonic() + JOB_POLL_DEADLINE_SECONDS |
| consecutive_errors = 0 |
| last_stage: str | None = None |
| while True: |
| try: |
| info = inspect_job( |
| job_id=job_id, |
| namespace=EVAL_JOB_NAMESPACE, |
| token=_jobs_token(), |
| ) |
| consecutive_errors = 0 |
| except Exception as e: |
| consecutive_errors += 1 |
| logger.warning( |
| "inspect_job(%s) failed (%d/%d): %s", |
| job_id, consecutive_errors, |
| JOB_POLL_MAX_CONSECUTIVE_ERRORS, e, |
| ) |
| if consecutive_errors >= JOB_POLL_MAX_CONSECUTIVE_ERRORS: |
| raise |
| time.sleep(JOB_POLL_INTERVAL_SECONDS) |
| continue |
|
|
| stage = info.status.stage |
| message = info.status.message |
| if stage in ("COMPLETED", "ERROR"): |
| return stage, message |
| |
| |
| |
| if stage != last_stage: |
| last_stage = stage |
| progress.publish( |
| submission_id, |
| progress.RUNNING, |
| _running_message_for_stage(stage), |
| ) |
| if time.monotonic() >= deadline: |
| return "ERROR", ( |
| f"Space-side poll deadline exceeded " |
| f"({JOB_POLL_DEADLINE_SECONDS}s); last stage={stage}" |
| ) |
| time.sleep(JOB_POLL_INTERVAL_SECONDS) |
|
|
|
|
| def _job_failure_reason( |
| job_id: str, stage: str, status_message: str | None, |
| ) -> str: |
| """Build a short ``failure_reason`` for a non-completed Job. |
| |
| Combines the job's own ``status.message`` (if any) with the last |
| ``JOB_LOG_TAIL_LINES`` of ``fetch_job_logs`` so the user sees |
| something actionable in the row. Log fetch is best-effort. |
| """ |
| parts: list[str] = [f"eval job {stage.lower()}"] |
| if status_message: |
| parts.append(status_message) |
| try: |
| tail = list( |
| fetch_job_logs( |
| job_id=job_id, |
| namespace=EVAL_JOB_NAMESPACE, |
| token=_jobs_token(), |
| ) |
| )[-JOB_LOG_TAIL_LINES:] |
| if tail: |
| parts.append("logs: " + " | ".join(tail)) |
| except Exception as e: |
| logger.warning("fetch_job_logs(%s) failed: %s", job_id, e) |
| return ": ".join(parts)[:FAILURE_REASON_MAX_CHARS] |
|
|
|
|
| def _fetch_run_summary_from_report(submission_id: str) -> dict[str, Any]: |
| """Download ``reports/<id>.json`` and return its ``run_summary`` dict. |
| |
| The Job uploaded the report bundle before exiting; by the time |
| ``inspect_job`` returns COMPLETED the file is on the Hub. Raises |
| if the report or the ``run_summary`` key is missing (which would |
| indicate an eval that ran-but-broke contract; we want loud |
| failure rather than a silently-empty row). |
| """ |
| path = hf_hub_download( |
| repo_id=HF_SUBMISSIONS_REPO, |
| filename=f"{REPORTS_DIR}/{submission_id}.json", |
| repo_type="dataset", |
| force_download=True, |
| ) |
| payload = json.loads(Path(path).read_text(encoding="utf-8")) |
| summary = payload.get("run_summary") |
| if not isinstance(summary, dict): |
| raise RuntimeError( |
| f"reports/{submission_id}.json missing or malformed " |
| f"`run_summary` block (got {type(summary).__name__})" |
| ) |
| return summary |
|
|
|
|
| def _merge_shards_and_publish( |
| submission_id: str, |
| shard_ids: list[str], |
| fixture_names: list[str], |
| ) -> dict[str, Any]: |
| """Merge every shard's per-fixture dirs into one run + publish results. |
| |
| Downloads ``reports/<id>/shards/**`` from the submissions dataset, |
| copies each shard's ``<fixture>/`` dir (``result.json`` + renders) |
| into a single merged run dir, then recomputes the aggregate exactly |
| as a single-job run would: ``write_run_summary`` over the union |
| (the proven merge primitive, importable from the Space's own |
| ``cadgenbench`` install -- no private-repo dependency), a |
| ``report.json`` bundle, an HTML report via the same ``report |
| single`` renderer the job uses, and the full gallery render folder |
| per fixture. Uploads ``reports/<id>.{html,json}`` + the gallery |
| renders, and returns the merged ``run_summary`` for the row flip. |
| |
| Raises if a shard's tree is missing, a fixture appears in two shards, |
| or the merged set doesn't cover every expected fixture -- any of |
| which means the fan-out lost or duplicated work and the row should |
| fail loudly rather than publish a partial aggregate. |
| """ |
| |
| |
| |
| from cadgenbench.eval.report.single_run import discover_run, generate_html |
| from cadgenbench.eval.run_summary import write_run_summary |
|
|
| tmp = Path(tempfile.mkdtemp(prefix=f"cgb-merge-{submission_id}-")) |
| try: |
| if _shard_bucket_enabled(): |
| shards_root = tmp / "dl" |
| shards_root.mkdir(parents=True, exist_ok=True) |
| _HF_API.sync_bucket( |
| source=_shard_bucket_uri(submission_id), |
| dest=str(shards_root), |
| token=_jobs_token(), |
| ) |
| else: |
| download_root = Path( |
| snapshot_download( |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| allow_patterns=[ |
| f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}/**" |
| ], |
| local_dir=str(tmp / "dl"), |
| ) |
| ) |
| shards_root = ( |
| download_root / REPORTS_DIR / submission_id / SHARDS_SUBDIR |
| ) |
| if not shards_root.is_dir(): |
| raise RuntimeError( |
| f"No shard artifacts found under {shards_root}." |
| ) |
|
|
| merged_run = tmp / "run" |
| merged_run.mkdir() |
| seen: set[str] = set() |
| for shard_dir in sorted(p for p in shards_root.iterdir() if p.is_dir()): |
| for fixture_dir in sorted( |
| p for p in shard_dir.iterdir() if p.is_dir() |
| ): |
| |
| |
| |
| if not (fixture_dir / "result.json").is_file(): |
| continue |
| name = fixture_dir.name |
| if name in seen: |
| raise RuntimeError( |
| f"Fixture {name!r} present in more than one shard." |
| ) |
| seen.add(name) |
| shutil.copytree(fixture_dir, merged_run / name) |
|
|
| missing = set(fixture_names) - seen |
| if missing: |
| raise RuntimeError( |
| f"Merged run missing {len(missing)} fixture(s) after shard " |
| f"merge: {', '.join(sorted(missing)[:5])}" |
| + ("..." if len(missing) > 5 else "") |
| ) |
|
|
| write_run_summary(merged_run) |
| report_json = _build_report_json(merged_run) |
|
|
| run_data = discover_run(merged_run) |
| |
| |
| |
| |
| |
| html = generate_html( |
| run_data, |
| render_base_url=render_submission_base_url(submission_id), |
| gt_base_url=GT_PROXY_BASE_URL, |
| input_base_url=INPUT_PROXY_BASE_URL, |
| download_url=_submission_zip_url(submission_id), |
| ) |
| html_path = tmp / f"{submission_id}.html" |
| html_path.write_text(html, encoding="utf-8") |
|
|
| _publish_reports_and_gallery(submission_id, html_path, report_json) |
| return report_json["run_summary"] |
| finally: |
| shutil.rmtree(tmp, ignore_errors=True) |
|
|
|
|
| def _build_report_json(run_dir: Path) -> dict[str, Any]: |
| """Bundle ``run_summary.json`` + every per-fixture ``result.json``. |
| |
| Identical shape to ``eval_job.py``'s ``_build_report_json`` so the |
| merged report matches a single-job report: the row flip reads |
| ``run_summary`` out of this and the bundle is what gets uploaded as |
| ``reports/<id>.json``. |
| """ |
| summary_path = run_dir / "run_summary.json" |
| if not summary_path.is_file(): |
| raise RuntimeError( |
| f"run_summary.json not produced under {run_dir} (merge issue?)" |
| ) |
| summary = json.loads(summary_path.read_text(encoding="utf-8")) |
| per_fixture: dict[str, dict[str, Any]] = {} |
| for fixture_dir in sorted(d for d in run_dir.iterdir() if d.is_dir()): |
| rp = fixture_dir / "result.json" |
| if rp.is_file(): |
| per_fixture[fixture_dir.name] = json.loads( |
| rp.read_text(encoding="utf-8") |
| ) |
| return {"run_summary": summary, "per_fixture_results": per_fixture} |
|
|
|
|
| def _publish_reports_and_gallery( |
| submission_id: str, |
| html_path: Path, |
| report_json: dict[str, Any], |
| ) -> None: |
| """Publish the merged report HTML + JSON to the submissions dataset. |
| |
| Commits ``reports/<id>.{html,json}`` in one ``create_commit``. The gallery |
| renders are **not** committed here: each shard job already uploaded its |
| fixtures' renders to the public render bucket under ``renders/<id>/``, and |
| the report HTML references them by bucket URL. Keeping the binary renders |
| out of the dataset repo is what avoids bloating its git history and the |
| commit-queue 429s the per-file fan-out used to cause. |
| """ |
| operations: list[CommitOperationAdd] = [ |
| CommitOperationAdd( |
| path_in_repo=f"{REPORTS_DIR}/{submission_id}.html", |
| path_or_fileobj=str(html_path), |
| ), |
| CommitOperationAdd( |
| path_in_repo=f"{REPORTS_DIR}/{submission_id}.json", |
| path_or_fileobj=json.dumps( |
| report_json, ensure_ascii=False, indent=2, |
| ).encode("utf-8"), |
| ), |
| ] |
| _with_hub_retries( |
| lambda: _HF_API.create_commit( |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| operations=operations, |
| commit_message=f"publish merged report for {submission_id}", |
| ), |
| what="merged report publish", |
| ) |
| logger.info("Published reports/%s.{html,json}", submission_id) |
|
|
|
|
| def _cleanup_shard_artifacts(submission_id: str) -> None: |
| """Delete ``reports/<id>/shards/`` after a successful merge. |
| |
| Best-effort: the merged ``reports/<id>.{html,json}`` + gallery are |
| the durable artifacts, so a failed cleanup only leaves recoverable |
| scratch behind and must never fail an otherwise-completed |
| submission. |
| """ |
| try: |
| if _shard_bucket_enabled(): |
| _delete_shard_bucket_prefix(submission_id) |
| else: |
| _with_hub_retries( |
| lambda: _HF_API.delete_folder( |
| path_in_repo=f"{REPORTS_DIR}/{submission_id}/{SHARDS_SUBDIR}", |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| commit_message=f"clean up eval shards for {submission_id}", |
| ), |
| what="shard cleanup", |
| ) |
| logger.info("Cleaned up shard artifacts for %s", submission_id) |
| except Exception as e: |
| logger.warning( |
| "Shard-artifact cleanup failed for %s (%s: %s); leaving scratch", |
| submission_id, type(e).__name__, e, |
| ) |
|
|
|
|
| def _delete_shard_bucket_prefix(submission_id: str) -> None: |
| """Remove every staged file under the submission's bucket shards prefix.""" |
| bucket_id = _shard_bucket_id() |
| prefix = _shard_bucket_prefix_path(submission_id) |
| token = _jobs_token() |
| files = [ |
| item.path |
| for item in _HF_API.list_bucket_tree( |
| bucket_id, prefix=prefix, recursive=True, token=token, |
| ) |
| if not getattr(item, "is_folder", False) and getattr(item, "path", None) |
| ] |
| if files: |
| _HF_API.batch_bucket_files(bucket_id, delete=files, token=token) |
|
|
|
|
| def _flip_row_to_completed(submission_id: str, summary: dict[str, Any]) -> None: |
| """Merge ``run_summary.json`` fields into the pending row.""" |
| updates: dict[str, Any] = { |
| "status": "completed", |
| "failure_reason": None, |
| "cadgenbench_data_revision": _resolve_data_revision(), |
| "aggregate_score": summary.get("aggregate_score"), |
| "validity_rate": summary.get("validity_rate"), |
| "score_by_task_type": summary.get("score_by_task_type"), |
| "per_task_scores": summary.get("per_task_scores"), |
| "per_fixture_scores": summary.get("per_fixture_scores"), |
| } |
| _update_row(submission_id, updates) |
|
|
|
|
| def _flip_row_to_failed(submission_id: str, reason: str) -> None: |
| """Mark the row as ``failed`` with a short reason; scores stay null.""" |
| _update_row( |
| submission_id, |
| {"status": "failed", "failure_reason": reason}, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _sweep_stuck_pending() -> None: |
| """Flip pending rows older than the threshold to failed. |
| |
| A ``pending`` row whose worker died (Space restart, OOM, crash) |
| has no one to flip it; without this sweep it stays pending in |
| the leaderboard forever. The check is "submitted_at older than |
| 30 min" - well above the real eval ceiling (~5 min on |
| cpu-upgrade), so any genuinely-still-running submission is safe. |
| Runs once per process at module-import time inside a daemon |
| thread so app boot doesn't block on the Hub read. |
| """ |
| try: |
| body = _download_results_jsonl() |
| except Exception as e: |
| logger.warning( |
| "Stuck-pending sweep skipped, Hub fetch failed (%s: %s)", |
| type(e).__name__, e, |
| ) |
| return |
|
|
| cutoff = datetime.now(timezone.utc) - timedelta( |
| seconds=STUCK_PENDING_THRESHOLD_SECONDS |
| ) |
| stuck_ids: list[str] = [] |
| for line in body.splitlines(): |
| if not line.strip(): |
| continue |
| try: |
| row = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if row.get("status") != "pending": |
| continue |
| sid = row.get("submission_id") |
| ts_str = row.get("submitted_at") |
| if not sid or not ts_str: |
| continue |
| try: |
| ts = datetime.strptime(ts_str, SUBMITTED_AT_FORMAT).replace( |
| tzinfo=timezone.utc |
| ) |
| except ValueError: |
| logger.warning( |
| "Skipping unparseable submitted_at %r on row %s", |
| ts_str, sid, |
| ) |
| continue |
| if ts < cutoff: |
| stuck_ids.append(sid) |
|
|
| if not stuck_ids: |
| logger.info("Stuck-pending sweep: nothing stale") |
| return |
|
|
| logger.warning( |
| "Stuck-pending sweep: flipping %d row(s) to failed: %s", |
| len(stuck_ids), stuck_ids, |
| ) |
| for sid in stuck_ids: |
| try: |
| _flip_row_to_failed(sid, STUCK_PENDING_REASON) |
| except Exception as e: |
| logger.exception( |
| "Stuck-pending flip failed for %s (%s: %s)", |
| sid, type(e).__name__, e, |
| ) |
|
|
|
|
| def _start_boot_sweep() -> None: |
| """Spawn the sweep on a daemon thread at module import. |
| |
| Setting ``CADGENBENCH_DISABLE_BOOT_SWEEP=1`` opts out (useful |
| for unit-test imports that don't want the Hub round-trip). |
| """ |
| if os.getenv(BOOT_SWEEP_ENV) == "1": |
| logger.info("Stuck-pending sweep disabled via %s", BOOT_SWEEP_ENV) |
| return |
| threading.Thread( |
| target=_sweep_stuck_pending, |
| name="cgb-boot-sweep", |
| daemon=True, |
| ).start() |
|
|
|
|
| _start_boot_sweep() |
|
|