# Copyright 2026 Hugging Face # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Admin-tab handlers for the CADGenBench leaderboard Space. Bundle 7: promote a row into the validated tier (recording the evidence type) and demote it back. Gating is the ``CADGENBENCH_ADMINS`` Space variable (comma-separated HF usernames); :func:`is_admin` is the single predicate the UI uses to enable or disable the controls. Row writes go through :func:`submit._hub_rmw_results`, so the ``_HUB_LOCK`` + read-modify-write semantics match the submit path exactly. There is no second writer of ``results.jsonl`` with its own locking story. """ from __future__ import annotations import json import logging import os import threading import time from typing import Any, Iterable import gradio as gr from huggingface_hub import cancel_job, list_jobs from huggingface_hub.errors import EntryNotFoundError from leaderboard import HF_RENDER_BUCKET, render_submission_prefix from submit import ( EVAL_JOB_NAMESPACE, HF_SUBMISSIONS_REPO, REPORTS_DIR, SUBMISSIONS_DIR, _HF_API, _download_results_jsonl, _hub_rmw_results, _spawn_worker, ) logger = logging.getLogger(__name__) ADMINS_ENV = "CADGENBENCH_ADMINS" # HF Job stages that are already finished: cancelling one is a no-op (and # usually an error), so the stop step skips them. Mirrors # huggingface_hub.JobStage; kept as plain strings so a new terminal # stage name added upstream doesn't import-break this module. _JOB_TERMINAL_STAGES: frozenset[str] = frozenset( {"COMPLETED", "ERROR", "CANCELED", "DELETED"} ) # The evidence types accepted on promotion. Mirrors the # `validation_method` enum in cadgenbench-submissions/schema.md and the # validation policy doc. VALID_METHODS: tuple[str, ...] = ("code", "traces", "api", "manual") # Score-shaped fields cleared when a row is flipped back to ``pending`` # for a rescore. Mirrors the pending regime in # cadgenbench-submissions/schema.md: every aggregate is ``null`` until # the fresh eval flips the row back to ``completed``. ``submitted_at`` # is intentionally *not* touched -- the schema defines it as the # immutable timestamp the row was first written, so a rescore preserves # the original submit provenance. _RESCORE_CLEARED_SCORE_FIELDS: tuple[str, ...] = ( "aggregate_score", "validity_rate", "score_by_task_type", "per_task_scores", "per_sample_scores", "per_sample_breakdown", ) # Gap between successive worker dispatches in a bulk rescore. Each # worker dispatches its own HF Job and then polls; staggering the # starts keeps a rescore-all from firing N ``run_job`` control-plane # calls in one burst (which can rate-limit) while HF's own queue # absorbs anything past the account's concurrent-slot cap. Small enough # to be invisible for a one-or-two-row rescore. RESCORE_DISPATCH_STAGGER_SECONDS = 2.0 def admin_usernames() -> set[str]: """Parse ``CADGENBENCH_ADMINS`` into a set of HF usernames. Comma-separated, whitespace-trimmed, empties dropped. Read fresh on each call so flipping the Space variable takes effect without a code deploy. Empty or unset yields an empty set, which means no one is an admin and the controls stay inert. """ raw = os.environ.get(ADMINS_ENV, "") return {part.strip() for part in raw.split(",") if part.strip()} def is_admin(profile: gr.OAuthProfile | None) -> bool: """Return whether *profile* is a logged-in user in the admin set. Logged-out users (``profile is None``) are never admins. With an empty admin set no profile qualifies, so the admin controls remain disabled for everyone until ``CADGENBENCH_ADMINS`` is populated. """ if profile is None: return False return profile.username in admin_usernames() def _clean_id_set(submission_ids: Iterable[str]) -> set[str]: """Normalise an id iterable to a non-empty set, else raise. Guards every bulk helper: a no-op call (nothing selected) is a caller error, surfaced as ``ValueError`` rather than a silent empty write. """ ids = {str(s) for s in submission_ids if s} if not ids: raise ValueError("No submissions selected.") return ids def promote_rows(submission_ids: Iterable[str], method: str) -> None: """Move every listed row into the validated tier with *method*. One ``results.jsonl`` write for the whole batch. Idempotent on rows already validated (their method is set to *method*). Raises: ValueError: *method* is unknown, or no ids were given. LookupError: one or more ids are absent from ``results.jsonl`` (no partial write happens; the helper raises inside the read-modify-write before the upload). """ if method not in VALID_METHODS: raise ValueError( f"Unknown validation_method {method!r}; expected one of " f"{', '.join(VALID_METHODS)}." ) ids = _clean_id_set(submission_ids) def mutate(rows: list[dict[str, Any]]) -> None: seen = set() for row in rows: if row.get("submission_id") in ids: row["validation_status"] = "validated" row["validation_method"] = method seen.add(row["submission_id"]) _raise_for_missing(ids, seen) _hub_rmw_results( mutate, commit_message=f"promote {len(ids)} row(s) to validated ({method})", ) def demote_rows(submission_ids: Iterable[str]) -> None: """Return every listed row to the unvalidated tier, clearing method. One ``results.jsonl`` write for the whole batch. Idempotent on rows already unvalidated. Raises: ValueError: no ids were given. LookupError: one or more ids are absent from ``results.jsonl``. """ ids = _clean_id_set(submission_ids) def mutate(rows: list[dict[str, Any]]) -> None: seen = set() for row in rows: if row.get("submission_id") in ids: row["validation_status"] = "unvalidated" row["validation_method"] = None seen.add(row["submission_id"]) _raise_for_missing(ids, seen) _hub_rmw_results( mutate, commit_message=f"demote {len(ids)} row(s) to unvalidated", ) def _current_fixture_names() -> list[str]: """Sorted fixture set of the *current* ``cadgenbench-data`` revision. A rescore re-evaluates each stored zip against whatever the data repo exposes now (the whole point after a GT swap), so the fixture set comes from the live inputs dir rather than from whatever the submission was originally scored against. This is the same source :func:`submit._validate_fixture_set` checks new uploads against, so the single-vs-sharded dispatch split matches the submit path. """ from cadgenbench.common.paths import data_inputs_dir root = data_inputs_dir() return sorted(p.name for p in root.iterdir() if p.is_dir()) def _dispatch_rescore_workers( targets: dict[str, str], fixture_names: list[str], ) -> None: """Spawn one eval worker per target on a staggered background thread. *targets* maps ``submission_id -> submission_blob_url``. Runs the dispatch loop on its own daemon thread so the caller (a Gradio handler) returns the moment the rows are flipped to pending, rather than blocking while N workers are kicked off. Each worker is the same fire-and-forget dispatch+poll thread the submit path uses, so a rescore reuses the entire eval pipeline (sharding, render upload, report regeneration, row flip) unchanged. """ items = list(targets.items()) def _run() -> None: for i, (submission_id, blob_url) in enumerate(items): if i: time.sleep(RESCORE_DISPATCH_STAGGER_SECONDS) try: _spawn_worker(submission_id, blob_url, fixture_names) except Exception as e: # noqa: BLE001 - one bad dispatch must not stall the rest logger.exception( "rescore: failed to spawn worker for %s (%s: %s)", submission_id, type(e).__name__, e, ) threading.Thread( target=_run, name="cgb-rescore-dispatch", daemon=True, ).start() def _rescore(ids: set[str], *, require_found: bool) -> tuple[int, list[str]]: """Flip *ids* back to pending, then dispatch a fresh eval for each. Single ``results.jsonl`` write resets every target row to the pending regime (status ``pending``, ``failure_reason`` cleared, all score fields nulled) and captures its stored ``submission_blob_url``; a row with no stored zip (legacy seed rows) can't be rescored and is collected as *skipped* instead. After the write commits, workers are dispatched on a staggered background thread. Idempotent and re-runnable: a rescore that's interrupted (Space restart) leaves its in-flight rows pending, which the boot-time stuck-pending sweep flips to failed, and re-running the rescore on those rows converges. ``submitted_at`` is preserved (immutable per the schema). Args: require_found: when True (selected-rows path) every id must exist in ``results.jsonl`` or :class:`LookupError` is raised before any worker is dispatched; when False (rescore-all path) the id set was just derived from the file so a missing id only means a concurrent delete and is ignored. Returns: ``(dispatched_count, skipped_ids)`` -- how many workers were queued and which ids were skipped for lacking a stored zip. """ captured: dict[str, str] = {} skipped: set[str] = set() def mutate(rows: list[dict[str, Any]]) -> None: seen = set() for row in rows: sid = row.get("submission_id") if sid not in ids: continue seen.add(sid) blob_url = row.get("submission_blob_url") if not blob_url: skipped.add(sid) continue row["status"] = "pending" row["failure_reason"] = None for field in _RESCORE_CLEARED_SCORE_FIELDS: row[field] = None captured[sid] = blob_url if require_found: _raise_for_missing(ids, seen) _hub_rmw_results( mutate, commit_message=f"rescore: reset {len(ids)} row(s) to pending", ) if captured: _dispatch_rescore_workers(captured, _current_fixture_names()) return len(captured), sorted(skipped) def rescore_rows(submission_ids: Iterable[str]) -> tuple[int, list[str]]: """Re-evaluate every listed submission against the current data. Resets each row to pending and re-dispatches the eval, which re-renders the gallery, regenerates ``reports/.{html,json}``, and recomputes the scores. Use after a ground-truth or metric change that invalidates existing scores. Raises: ValueError: no ids were given. LookupError: one or more ids are absent from ``results.jsonl`` (no row is reset and no worker is dispatched). Returns: ``(dispatched_count, skipped_ids)``; *skipped_ids* are rows that have no stored zip to re-evaluate (legacy seed rows). """ ids = _clean_id_set(submission_ids) return _rescore(ids, require_found=True) def _rescoreable_ids_from_hub() -> set[str]: """Every submission_id with a stored zip that isn't mid-eval. Reads the live ``results.jsonl`` and returns the ids eligible for a bulk rescore: a row needs a ``submission_blob_url`` (so there's a zip to re-evaluate) and must not already be ``pending`` (skipping in-flight evals avoids double-dispatching a row a worker is already driving). Completed and failed rows both qualify. """ body = _download_results_jsonl() ids: set[str] = set() for line in body.splitlines(): if not line.strip(): continue try: row = json.loads(line) except json.JSONDecodeError: continue if not row.get("submission_blob_url"): continue if row.get("status") == "pending": continue sid = row.get("submission_id") if sid: ids.add(sid) return ids def rescore_all() -> tuple[int, list[str]]: """Re-evaluate every rescoreable submission (see :func:`rescore_rows`). Targets every row with a stored zip that isn't already pending. This is the heavy, board-wide action a maintainer runs after a GT swap. Raises: ValueError: nothing is rescoreable (empty board, or every row is pending / lacks a stored zip). Returns: ``(dispatched_count, skipped_ids)``. """ ids = _rescoreable_ids_from_hub() if not ids: raise ValueError( "No rescoreable submissions (every row is pending or has no " "stored zip)." ) return _rescore(ids, require_found=False) def delete_rows(submission_ids: Iterable[str]) -> None: """Permanently delete every listed submission: artifacts then row. Irreversible. For each id, best-effort deletes the companion blobs (``submissions/.zip``, ``reports/.{html,json}``) and then drops the row from ``results.jsonl`` in a single write. A blob that does not exist is skipped (a failed / pending row may never have had a report). Missing ``results.jsonl`` rows are tolerated too, so a re-run after a partial failure still converges. Raises: ValueError: no ids were given. """ ids = _clean_id_set(submission_ids) for sid in sorted(ids): for path in ( f"{SUBMISSIONS_DIR}/{sid}.zip", f"{REPORTS_DIR}/{sid}.html", f"{REPORTS_DIR}/{sid}.json", ): try: _HF_API.delete_file( path_in_repo=path, repo_id=HF_SUBMISSIONS_REPO, repo_type="dataset", commit_message=f"delete artifact {path}", ) except EntryNotFoundError: pass except Exception as e: # noqa: BLE001 - keep deleting the rest logger.warning( "Failed to delete artifact %s (%s: %s)", path, type(e).__name__, e, ) _delete_bucket_renders(sid) def mutate(rows: list[dict[str, Any]]) -> None: rows[:] = [r for r in rows if r.get("submission_id") not in ids] _hub_rmw_results( mutate, commit_message=f"delete {len(ids)} submission(s)", ) def _delete_bucket_renders(submission_id: str) -> None: """Delete every render for *submission_id* from the public render bucket. The renders live under ``renders//`` in the bucket (uploaded by the eval job). ``batch_bucket_files`` has no recursive prefix delete, so we list the prefix and delete the files in one batch. Best-effort: a bucket failure is logged, never blocks the row deletion (mirrors the dataset-artifact path). """ prefix = render_submission_prefix(submission_id) try: paths = [ entry.path for entry in _HF_API.list_bucket_tree( HF_RENDER_BUCKET, prefix=prefix, recursive=True, ) if getattr(entry, "path", None) and not entry.path.endswith("/") ] if paths: _HF_API.batch_bucket_files(HF_RENDER_BUCKET, delete=paths) logger.info( "Deleted %d render(s) under %s from bucket %s", len(paths), prefix, HF_RENDER_BUCKET, ) except Exception as e: # noqa: BLE001 - bucket failure must not block delete logger.warning( "Failed to delete bucket renders under %s (%s: %s)", prefix, type(e).__name__, e, ) def _cancel_jobs_for_submissions(ids: set[str]) -> int: """Best-effort cancel every non-terminal eval Job for one of *ids*. Each eval Job is dispatched with its ``submission_id`` baked into the command argv (see :func:`submit._dispatch_eval_command`), so there's no need to persist a ``job_id`` on the row: we list the eval account's jobs and cancel any still-running one whose command mentions a target id. This also catches a submission's shard jobs, since each shard carries the same id in its command. Never raises. A job that already finished, a listing failure, or a cancel race must not block the row delete that follows -- the GPU job carries its own ``--timeout`` and self-reaps if a cancel is missed. Returns the count of cancel calls that succeeded (for logging only). """ token = os.environ.get("HF_TOKEN") try: jobs = list_jobs(namespace=EVAL_JOB_NAMESPACE, token=token) except Exception as e: # noqa: BLE001 - listing is best-effort logger.warning( "list_jobs(%s) failed (%s: %s); skipping job cancel, deleting rows", EVAL_JOB_NAMESPACE, type(e).__name__, e, ) return 0 cancelled = 0 for job in jobs: stage = getattr(getattr(job, "status", None), "stage", None) if stage in _JOB_TERMINAL_STAGES: continue argv = list(getattr(job, "command", None) or []) + list( getattr(job, "arguments", None) or [] ) if not any(sid in argv for sid in ids): continue try: cancel_job( job_id=job.id, namespace=EVAL_JOB_NAMESPACE, token=token, ) cancelled += 1 logger.info( "Cancelled eval job %s (stage %s) before delete", job.id, stage, ) except Exception as e: # noqa: BLE001 - cancel is best-effort logger.warning( "cancel_job(%s) failed (%s: %s); deleting row anyway", job.id, type(e).__name__, e, ) return cancelled def stop_and_delete_rows(submission_ids: Iterable[str]) -> None: """Cancel any running eval Job(s) for the listed rows, then delete them. The "stop" step (:func:`_cancel_jobs_for_submissions`) is best-effort and never raises; the "delete" step is the existing :func:`delete_rows` (artifacts then row). So this is exactly "stop if needed, then delete", and it is the right action for a stuck/pending submission whose GPU job is still in flight. Raises: ValueError: no ids were given. """ ids = _clean_id_set(submission_ids) cancelled = _cancel_jobs_for_submissions(ids) logger.info( "stop_and_delete: cancelled %d job(s) for %d submission(s)", cancelled, len(ids), ) delete_rows(ids) def _raise_for_missing(requested: set[str], seen: set[str]) -> None: """Raise ``LookupError`` if any requested id was not found in the rows.""" missing = requested - seen if missing: raise LookupError( f"submission_id(s) not in results.jsonl: {', '.join(sorted(missing))}." ) def promote_row(submission_id: str, method: str) -> None: """Single-row convenience wrapper over :func:`promote_rows`.""" promote_rows([submission_id], method) def demote_row(submission_id: str) -> None: """Single-row convenience wrapper over :func:`demote_rows`.""" demote_rows([submission_id])