| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """Admin-tab handlers for the CADGenBench leaderboard Space. |
| |
| Bundle 7: promote a row into the validated tier (recording the evidence |
| type) and demote it back. Gating is the ``CADGENBENCH_ADMINS`` Space |
| variable (comma-separated HF usernames); :func:`is_admin` is the single |
| predicate the UI uses to enable or disable the controls. |
| |
| Row writes go through :func:`submit._hub_rmw_results`, so the |
| ``_HUB_LOCK`` + read-modify-write semantics match the submit path |
| exactly. There is no second writer of ``results.jsonl`` with its own |
| locking story. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import threading |
| import time |
| from typing import Any, Iterable |
|
|
| import gradio as gr |
| from huggingface_hub import cancel_job, list_jobs |
| from huggingface_hub.errors import EntryNotFoundError |
|
|
| from leaderboard import HF_RENDER_BUCKET, render_submission_prefix |
| from submit import ( |
| EVAL_JOB_NAMESPACE, |
| HF_SUBMISSIONS_REPO, |
| REPORTS_DIR, |
| SUBMISSIONS_DIR, |
| _HF_API, |
| _download_results_jsonl, |
| _hub_rmw_results, |
| _spawn_worker, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| ADMINS_ENV = "CADGENBENCH_ADMINS" |
|
|
| |
| |
| |
| |
| _JOB_TERMINAL_STAGES: frozenset[str] = frozenset( |
| {"COMPLETED", "ERROR", "CANCELED", "DELETED"} |
| ) |
|
|
| |
| |
| |
| VALID_METHODS: tuple[str, ...] = ("code", "traces", "api", "manual") |
|
|
| |
| |
| |
| |
| |
| |
| |
| _RESCORE_CLEARED_SCORE_FIELDS: tuple[str, ...] = ( |
| "aggregate_score", |
| "validity_rate", |
| "score_by_task_type", |
| "per_task_scores", |
| "per_sample_scores", |
| "per_sample_breakdown", |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| RESCORE_DISPATCH_STAGGER_SECONDS = 2.0 |
|
|
|
|
| def admin_usernames() -> set[str]: |
| """Parse ``CADGENBENCH_ADMINS`` into a set of HF usernames. |
| |
| Comma-separated, whitespace-trimmed, empties dropped. Read fresh on |
| each call so flipping the Space variable takes effect without a code |
| deploy. Empty or unset yields an empty set, which means no one is an |
| admin and the controls stay inert. |
| """ |
| raw = os.environ.get(ADMINS_ENV, "") |
| return {part.strip() for part in raw.split(",") if part.strip()} |
|
|
|
|
| def is_admin(profile: gr.OAuthProfile | None) -> bool: |
| """Return whether *profile* is a logged-in user in the admin set. |
| |
| Logged-out users (``profile is None``) are never admins. With an |
| empty admin set no profile qualifies, so the admin controls remain |
| disabled for everyone until ``CADGENBENCH_ADMINS`` is populated. |
| """ |
| if profile is None: |
| return False |
| return profile.username in admin_usernames() |
|
|
|
|
| def _clean_id_set(submission_ids: Iterable[str]) -> set[str]: |
| """Normalise an id iterable to a non-empty set, else raise. |
| |
| Guards every bulk helper: a no-op call (nothing selected) is a |
| caller error, surfaced as ``ValueError`` rather than a silent |
| empty write. |
| """ |
| ids = {str(s) for s in submission_ids if s} |
| if not ids: |
| raise ValueError("No submissions selected.") |
| return ids |
|
|
|
|
| def promote_rows(submission_ids: Iterable[str], method: str) -> None: |
| """Move every listed row into the validated tier with *method*. |
| |
| One ``results.jsonl`` write for the whole batch. Idempotent on rows |
| already validated (their method is set to *method*). |
| |
| Raises: |
| ValueError: *method* is unknown, or no ids were given. |
| LookupError: one or more ids are absent from ``results.jsonl`` |
| (no partial write happens; the helper raises inside the |
| read-modify-write before the upload). |
| """ |
| if method not in VALID_METHODS: |
| raise ValueError( |
| f"Unknown validation_method {method!r}; expected one of " |
| f"{', '.join(VALID_METHODS)}." |
| ) |
| ids = _clean_id_set(submission_ids) |
|
|
| def mutate(rows: list[dict[str, Any]]) -> None: |
| seen = set() |
| for row in rows: |
| if row.get("submission_id") in ids: |
| row["validation_status"] = "validated" |
| row["validation_method"] = method |
| seen.add(row["submission_id"]) |
| _raise_for_missing(ids, seen) |
|
|
| _hub_rmw_results( |
| mutate, |
| commit_message=f"promote {len(ids)} row(s) to validated ({method})", |
| ) |
|
|
|
|
| def demote_rows(submission_ids: Iterable[str]) -> None: |
| """Return every listed row to the unvalidated tier, clearing method. |
| |
| One ``results.jsonl`` write for the whole batch. Idempotent on rows |
| already unvalidated. |
| |
| Raises: |
| ValueError: no ids were given. |
| LookupError: one or more ids are absent from ``results.jsonl``. |
| """ |
| ids = _clean_id_set(submission_ids) |
|
|
| def mutate(rows: list[dict[str, Any]]) -> None: |
| seen = set() |
| for row in rows: |
| if row.get("submission_id") in ids: |
| row["validation_status"] = "unvalidated" |
| row["validation_method"] = None |
| seen.add(row["submission_id"]) |
| _raise_for_missing(ids, seen) |
|
|
| _hub_rmw_results( |
| mutate, |
| commit_message=f"demote {len(ids)} row(s) to unvalidated", |
| ) |
|
|
|
|
| def _current_fixture_names() -> list[str]: |
| """Sorted fixture set of the *current* ``cadgenbench-data`` revision. |
| |
| A rescore re-evaluates each stored zip against whatever the data |
| repo exposes now (the whole point after a GT swap), so the fixture |
| set comes from the live inputs dir rather than from whatever the |
| submission was originally scored against. This is the same source |
| :func:`submit._validate_fixture_set` checks new uploads against, so |
| the single-vs-sharded dispatch split matches the submit path. |
| """ |
| from cadgenbench.common.paths import data_inputs_dir |
|
|
| root = data_inputs_dir() |
| return sorted(p.name for p in root.iterdir() if p.is_dir()) |
|
|
|
|
| def _dispatch_rescore_workers( |
| targets: dict[str, str], fixture_names: list[str], |
| ) -> None: |
| """Spawn one eval worker per target on a staggered background thread. |
| |
| *targets* maps ``submission_id -> submission_blob_url``. Runs the |
| dispatch loop on its own daemon thread so the caller (a Gradio |
| handler) returns the moment the rows are flipped to pending, rather |
| than blocking while N workers are kicked off. Each worker is the |
| same fire-and-forget dispatch+poll thread the submit path uses, so |
| a rescore reuses the entire eval pipeline (sharding, render upload, |
| report regeneration, row flip) unchanged. |
| """ |
| items = list(targets.items()) |
|
|
| def _run() -> None: |
| for i, (submission_id, blob_url) in enumerate(items): |
| if i: |
| time.sleep(RESCORE_DISPATCH_STAGGER_SECONDS) |
| try: |
| _spawn_worker(submission_id, blob_url, fixture_names) |
| except Exception as e: |
| logger.exception( |
| "rescore: failed to spawn worker for %s (%s: %s)", |
| submission_id, type(e).__name__, e, |
| ) |
|
|
| threading.Thread( |
| target=_run, name="cgb-rescore-dispatch", daemon=True, |
| ).start() |
|
|
|
|
| def _rescore(ids: set[str], *, require_found: bool) -> tuple[int, list[str]]: |
| """Flip *ids* back to pending, then dispatch a fresh eval for each. |
| |
| Single ``results.jsonl`` write resets every target row to the |
| pending regime (status ``pending``, ``failure_reason`` cleared, all |
| score fields nulled) and captures its stored ``submission_blob_url``; |
| a row with no stored zip (legacy seed rows) can't be rescored and is |
| collected as *skipped* instead. After the write commits, workers are |
| dispatched on a staggered background thread. |
| |
| Idempotent and re-runnable: a rescore that's interrupted (Space |
| restart) leaves its in-flight rows pending, which the boot-time |
| stuck-pending sweep flips to failed, and re-running the rescore on |
| those rows converges. ``submitted_at`` is preserved (immutable per |
| the schema). |
| |
| Args: |
| require_found: when True (selected-rows path) every id must |
| exist in ``results.jsonl`` or :class:`LookupError` is raised |
| before any worker is dispatched; when False (rescore-all |
| path) the id set was just derived from the file so a missing |
| id only means a concurrent delete and is ignored. |
| |
| Returns: |
| ``(dispatched_count, skipped_ids)`` -- how many workers were |
| queued and which ids were skipped for lacking a stored zip. |
| """ |
| captured: dict[str, str] = {} |
| skipped: set[str] = set() |
|
|
| def mutate(rows: list[dict[str, Any]]) -> None: |
| seen = set() |
| for row in rows: |
| sid = row.get("submission_id") |
| if sid not in ids: |
| continue |
| seen.add(sid) |
| blob_url = row.get("submission_blob_url") |
| if not blob_url: |
| skipped.add(sid) |
| continue |
| row["status"] = "pending" |
| row["failure_reason"] = None |
| for field in _RESCORE_CLEARED_SCORE_FIELDS: |
| row[field] = None |
| captured[sid] = blob_url |
| if require_found: |
| _raise_for_missing(ids, seen) |
|
|
| _hub_rmw_results( |
| mutate, |
| commit_message=f"rescore: reset {len(ids)} row(s) to pending", |
| ) |
|
|
| if captured: |
| _dispatch_rescore_workers(captured, _current_fixture_names()) |
| return len(captured), sorted(skipped) |
|
|
|
|
| def rescore_rows(submission_ids: Iterable[str]) -> tuple[int, list[str]]: |
| """Re-evaluate every listed submission against the current data. |
| |
| Resets each row to pending and re-dispatches the eval, which |
| re-renders the gallery, regenerates ``reports/<id>.{html,json}``, |
| and recomputes the scores. Use after a ground-truth or metric change |
| that invalidates existing scores. |
| |
| Raises: |
| ValueError: no ids were given. |
| LookupError: one or more ids are absent from ``results.jsonl`` |
| (no row is reset and no worker is dispatched). |
| |
| Returns: |
| ``(dispatched_count, skipped_ids)``; *skipped_ids* are rows that |
| have no stored zip to re-evaluate (legacy seed rows). |
| """ |
| ids = _clean_id_set(submission_ids) |
| return _rescore(ids, require_found=True) |
|
|
|
|
| def _rescoreable_ids_from_hub() -> set[str]: |
| """Every submission_id with a stored zip that isn't mid-eval. |
| |
| Reads the live ``results.jsonl`` and returns the ids eligible for a |
| bulk rescore: a row needs a ``submission_blob_url`` (so there's a |
| zip to re-evaluate) and must not already be ``pending`` (skipping |
| in-flight evals avoids double-dispatching a row a worker is already |
| driving). Completed and failed rows both qualify. |
| """ |
| body = _download_results_jsonl() |
| ids: set[str] = set() |
| for line in body.splitlines(): |
| if not line.strip(): |
| continue |
| try: |
| row = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if not row.get("submission_blob_url"): |
| continue |
| if row.get("status") == "pending": |
| continue |
| sid = row.get("submission_id") |
| if sid: |
| ids.add(sid) |
| return ids |
|
|
|
|
| def rescore_all() -> tuple[int, list[str]]: |
| """Re-evaluate every rescoreable submission (see :func:`rescore_rows`). |
| |
| Targets every row with a stored zip that isn't already pending. This |
| is the heavy, board-wide action a maintainer runs after a GT swap. |
| |
| Raises: |
| ValueError: nothing is rescoreable (empty board, or every row is |
| pending / lacks a stored zip). |
| |
| Returns: |
| ``(dispatched_count, skipped_ids)``. |
| """ |
| ids = _rescoreable_ids_from_hub() |
| if not ids: |
| raise ValueError( |
| "No rescoreable submissions (every row is pending or has no " |
| "stored zip)." |
| ) |
| return _rescore(ids, require_found=False) |
|
|
|
|
| def delete_rows(submission_ids: Iterable[str]) -> None: |
| """Permanently delete every listed submission: artifacts then row. |
| |
| Irreversible. For each id, best-effort deletes the companion blobs |
| (``submissions/<id>.zip``, ``reports/<id>.{html,json}``) and then |
| drops the row from ``results.jsonl`` in a single write. A blob that |
| does not exist is skipped (a failed / pending row may never have |
| had a report). Missing ``results.jsonl`` rows are tolerated too, so |
| a re-run after a partial failure still converges. |
| |
| Raises: |
| ValueError: no ids were given. |
| """ |
| ids = _clean_id_set(submission_ids) |
|
|
| for sid in sorted(ids): |
| for path in ( |
| f"{SUBMISSIONS_DIR}/{sid}.zip", |
| f"{REPORTS_DIR}/{sid}.html", |
| f"{REPORTS_DIR}/{sid}.json", |
| ): |
| try: |
| _HF_API.delete_file( |
| path_in_repo=path, |
| repo_id=HF_SUBMISSIONS_REPO, |
| repo_type="dataset", |
| commit_message=f"delete artifact {path}", |
| ) |
| except EntryNotFoundError: |
| pass |
| except Exception as e: |
| logger.warning( |
| "Failed to delete artifact %s (%s: %s)", |
| path, type(e).__name__, e, |
| ) |
| _delete_bucket_renders(sid) |
|
|
| def mutate(rows: list[dict[str, Any]]) -> None: |
| rows[:] = [r for r in rows if r.get("submission_id") not in ids] |
|
|
| _hub_rmw_results( |
| mutate, commit_message=f"delete {len(ids)} submission(s)", |
| ) |
|
|
|
|
| def _delete_bucket_renders(submission_id: str) -> None: |
| """Delete every render for *submission_id* from the public render bucket. |
| |
| The renders live under ``renders/<id>/`` in the bucket (uploaded by the eval |
| job). ``batch_bucket_files`` has no recursive prefix delete, so we list the |
| prefix and delete the files in one batch. Best-effort: a bucket failure is |
| logged, never blocks the row deletion (mirrors the dataset-artifact path). |
| """ |
| prefix = render_submission_prefix(submission_id) |
| try: |
| paths = [ |
| entry.path |
| for entry in _HF_API.list_bucket_tree( |
| HF_RENDER_BUCKET, prefix=prefix, recursive=True, |
| ) |
| if getattr(entry, "path", None) and not entry.path.endswith("/") |
| ] |
| if paths: |
| _HF_API.batch_bucket_files(HF_RENDER_BUCKET, delete=paths) |
| logger.info( |
| "Deleted %d render(s) under %s from bucket %s", |
| len(paths), prefix, HF_RENDER_BUCKET, |
| ) |
| except Exception as e: |
| logger.warning( |
| "Failed to delete bucket renders under %s (%s: %s)", |
| prefix, type(e).__name__, e, |
| ) |
|
|
|
|
| def _cancel_jobs_for_submissions(ids: set[str]) -> int: |
| """Best-effort cancel every non-terminal eval Job for one of *ids*. |
| |
| Each eval Job is dispatched with its ``submission_id`` baked into the |
| command argv (see :func:`submit._dispatch_eval_command`), so there's |
| no need to persist a ``job_id`` on the row: we list the eval |
| account's jobs and cancel any still-running one whose command |
| mentions a target id. This also catches a submission's shard jobs, |
| since each shard carries the same id in its command. |
| |
| Never raises. A job that already finished, a listing failure, or a |
| cancel race must not block the row delete that follows -- the GPU job |
| carries its own ``--timeout`` and self-reaps if a cancel is missed. |
| Returns the count of cancel calls that succeeded (for logging only). |
| """ |
| token = os.environ.get("HF_TOKEN") |
| try: |
| jobs = list_jobs(namespace=EVAL_JOB_NAMESPACE, token=token) |
| except Exception as e: |
| logger.warning( |
| "list_jobs(%s) failed (%s: %s); skipping job cancel, deleting rows", |
| EVAL_JOB_NAMESPACE, type(e).__name__, e, |
| ) |
| return 0 |
|
|
| cancelled = 0 |
| for job in jobs: |
| stage = getattr(getattr(job, "status", None), "stage", None) |
| if stage in _JOB_TERMINAL_STAGES: |
| continue |
| argv = list(getattr(job, "command", None) or []) + list( |
| getattr(job, "arguments", None) or [] |
| ) |
| if not any(sid in argv for sid in ids): |
| continue |
| try: |
| cancel_job( |
| job_id=job.id, namespace=EVAL_JOB_NAMESPACE, token=token, |
| ) |
| cancelled += 1 |
| logger.info( |
| "Cancelled eval job %s (stage %s) before delete", job.id, stage, |
| ) |
| except Exception as e: |
| logger.warning( |
| "cancel_job(%s) failed (%s: %s); deleting row anyway", |
| job.id, type(e).__name__, e, |
| ) |
| return cancelled |
|
|
|
|
| def stop_and_delete_rows(submission_ids: Iterable[str]) -> None: |
| """Cancel any running eval Job(s) for the listed rows, then delete them. |
| |
| The "stop" step (:func:`_cancel_jobs_for_submissions`) is |
| best-effort and never raises; the "delete" step is the existing |
| :func:`delete_rows` (artifacts then row). So this is exactly "stop if |
| needed, then delete", and it is the right action for a stuck/pending |
| submission whose GPU job is still in flight. |
| |
| Raises: |
| ValueError: no ids were given. |
| """ |
| ids = _clean_id_set(submission_ids) |
| cancelled = _cancel_jobs_for_submissions(ids) |
| logger.info( |
| "stop_and_delete: cancelled %d job(s) for %d submission(s)", |
| cancelled, len(ids), |
| ) |
| delete_rows(ids) |
|
|
|
|
| def _raise_for_missing(requested: set[str], seen: set[str]) -> None: |
| """Raise ``LookupError`` if any requested id was not found in the rows.""" |
| missing = requested - seen |
| if missing: |
| raise LookupError( |
| f"submission_id(s) not in results.jsonl: {', '.join(sorted(missing))}." |
| ) |
|
|
|
|
| def promote_row(submission_id: str, method: str) -> None: |
| """Single-row convenience wrapper over :func:`promote_rows`.""" |
| promote_rows([submission_id], method) |
|
|
|
|
| def demote_row(submission_id: str) -> None: |
| """Single-row convenience wrapper over :func:`demote_rows`.""" |
| demote_rows([submission_id]) |
|
|