# Copyright 2026 Hugging Face # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Admin-tab handlers for the CADGenBench leaderboard Space. Bundle 7: promote a row into the validated tier (recording the evidence type) and demote it back. Gating is the ``CADGENBENCH_ADMINS`` Space variable (comma-separated HF usernames); :func:`is_admin` is the single predicate the UI uses to enable or disable the controls. Row writes go through :func:`submit._hub_rmw_results`, so the ``_HUB_LOCK`` + read-modify-write semantics match the submit path exactly. There is no second writer of ``results.jsonl`` with its own locking story. """ from __future__ import annotations import logging import os from typing import Any, Iterable import gradio as gr from huggingface_hub import cancel_job, list_jobs from huggingface_hub.errors import EntryNotFoundError from leaderboard import HF_RENDER_BUCKET, render_submission_prefix from submit import ( EVAL_JOB_NAMESPACE, HF_SUBMISSIONS_REPO, REPORTS_DIR, SUBMISSIONS_DIR, _HF_API, _hub_rmw_results, ) logger = logging.getLogger(__name__) ADMINS_ENV = "CADGENBENCH_ADMINS" # HF Job stages that are already finished: cancelling one is a no-op (and # usually an error), so the stop step skips them. Mirrors # huggingface_hub.JobStage; kept as plain strings so a new terminal # stage name added upstream doesn't import-break this module. _JOB_TERMINAL_STAGES: frozenset[str] = frozenset( {"COMPLETED", "ERROR", "CANCELED", "DELETED"} ) # The evidence types accepted on promotion. Mirrors the # `validation_method` enum in cadgenbench-submissions/schema.md and the # validation policy doc. VALID_METHODS: tuple[str, ...] = ("code", "traces", "api", "manual") def admin_usernames() -> set[str]: """Parse ``CADGENBENCH_ADMINS`` into a set of HF usernames. Comma-separated, whitespace-trimmed, empties dropped. Read fresh on each call so flipping the Space variable takes effect without a code deploy. Empty or unset yields an empty set, which means no one is an admin and the controls stay inert. """ raw = os.environ.get(ADMINS_ENV, "") return {part.strip() for part in raw.split(",") if part.strip()} def is_admin(profile: gr.OAuthProfile | None) -> bool: """Return whether *profile* is a logged-in user in the admin set. Logged-out users (``profile is None``) are never admins. With an empty admin set no profile qualifies, so the admin controls remain disabled for everyone until ``CADGENBENCH_ADMINS`` is populated. """ if profile is None: return False return profile.username in admin_usernames() def _clean_id_set(submission_ids: Iterable[str]) -> set[str]: """Normalise an id iterable to a non-empty set, else raise. Guards every bulk helper: a no-op call (nothing selected) is a caller error, surfaced as ``ValueError`` rather than a silent empty write. """ ids = {str(s) for s in submission_ids if s} if not ids: raise ValueError("No submissions selected.") return ids def promote_rows(submission_ids: Iterable[str], method: str) -> None: """Move every listed row into the validated tier with *method*. One ``results.jsonl`` write for the whole batch. Idempotent on rows already validated (their method is set to *method*). Raises: ValueError: *method* is unknown, or no ids were given. LookupError: one or more ids are absent from ``results.jsonl`` (no partial write happens; the helper raises inside the read-modify-write before the upload). """ if method not in VALID_METHODS: raise ValueError( f"Unknown validation_method {method!r}; expected one of " f"{', '.join(VALID_METHODS)}." ) ids = _clean_id_set(submission_ids) def mutate(rows: list[dict[str, Any]]) -> None: seen = set() for row in rows: if row.get("submission_id") in ids: row["validation_status"] = "validated" row["validation_method"] = method seen.add(row["submission_id"]) _raise_for_missing(ids, seen) _hub_rmw_results( mutate, commit_message=f"promote {len(ids)} row(s) to validated ({method})", ) def demote_rows(submission_ids: Iterable[str]) -> None: """Return every listed row to the unvalidated tier, clearing method. One ``results.jsonl`` write for the whole batch. Idempotent on rows already unvalidated. Raises: ValueError: no ids were given. LookupError: one or more ids are absent from ``results.jsonl``. """ ids = _clean_id_set(submission_ids) def mutate(rows: list[dict[str, Any]]) -> None: seen = set() for row in rows: if row.get("submission_id") in ids: row["validation_status"] = "unvalidated" row["validation_method"] = None seen.add(row["submission_id"]) _raise_for_missing(ids, seen) _hub_rmw_results( mutate, commit_message=f"demote {len(ids)} row(s) to unvalidated", ) def delete_rows(submission_ids: Iterable[str]) -> None: """Permanently delete every listed submission: artifacts then row. Irreversible. For each id, best-effort deletes the companion blobs (``submissions/.zip``, ``reports/.{html,json}``) and then drops the row from ``results.jsonl`` in a single write. A blob that does not exist is skipped (a failed / pending row may never have had a report). Missing ``results.jsonl`` rows are tolerated too, so a re-run after a partial failure still converges. Raises: ValueError: no ids were given. """ ids = _clean_id_set(submission_ids) for sid in sorted(ids): for path in ( f"{SUBMISSIONS_DIR}/{sid}.zip", f"{REPORTS_DIR}/{sid}.html", f"{REPORTS_DIR}/{sid}.json", ): try: _HF_API.delete_file( path_in_repo=path, repo_id=HF_SUBMISSIONS_REPO, repo_type="dataset", commit_message=f"delete artifact {path}", ) except EntryNotFoundError: pass except Exception as e: # noqa: BLE001 - keep deleting the rest logger.warning( "Failed to delete artifact %s (%s: %s)", path, type(e).__name__, e, ) _delete_bucket_renders(sid) def mutate(rows: list[dict[str, Any]]) -> None: rows[:] = [r for r in rows if r.get("submission_id") not in ids] _hub_rmw_results( mutate, commit_message=f"delete {len(ids)} submission(s)", ) def _delete_bucket_renders(submission_id: str) -> None: """Delete every render for *submission_id* from the public render bucket. The renders live under ``renders//`` in the bucket (uploaded by the eval job). ``batch_bucket_files`` has no recursive prefix delete, so we list the prefix and delete the files in one batch. Best-effort: a bucket failure is logged, never blocks the row deletion (mirrors the dataset-artifact path). """ prefix = render_submission_prefix(submission_id) try: paths = [ entry.path for entry in _HF_API.list_bucket_tree( HF_RENDER_BUCKET, prefix=prefix, recursive=True, ) if getattr(entry, "path", None) and not entry.path.endswith("/") ] if paths: _HF_API.batch_bucket_files(HF_RENDER_BUCKET, delete=paths) logger.info( "Deleted %d render(s) under %s from bucket %s", len(paths), prefix, HF_RENDER_BUCKET, ) except Exception as e: # noqa: BLE001 - bucket failure must not block delete logger.warning( "Failed to delete bucket renders under %s (%s: %s)", prefix, type(e).__name__, e, ) def _cancel_jobs_for_submissions(ids: set[str]) -> int: """Best-effort cancel every non-terminal eval Job for one of *ids*. Each eval Job is dispatched with its ``submission_id`` baked into the command argv (see :func:`submit._dispatch_eval_command`), so there's no need to persist a ``job_id`` on the row: we list the eval account's jobs and cancel any still-running one whose command mentions a target id. This also catches a submission's shard jobs, since each shard carries the same id in its command. Never raises. A job that already finished, a listing failure, or a cancel race must not block the row delete that follows -- the GPU job carries its own ``--timeout`` and self-reaps if a cancel is missed. Returns the count of cancel calls that succeeded (for logging only). """ token = os.environ.get("HF_TOKEN") try: jobs = list_jobs(namespace=EVAL_JOB_NAMESPACE, token=token) except Exception as e: # noqa: BLE001 - listing is best-effort logger.warning( "list_jobs(%s) failed (%s: %s); skipping job cancel, deleting rows", EVAL_JOB_NAMESPACE, type(e).__name__, e, ) return 0 cancelled = 0 for job in jobs: stage = getattr(getattr(job, "status", None), "stage", None) if stage in _JOB_TERMINAL_STAGES: continue argv = list(getattr(job, "command", None) or []) + list( getattr(job, "arguments", None) or [] ) if not any(sid in argv for sid in ids): continue try: cancel_job( job_id=job.id, namespace=EVAL_JOB_NAMESPACE, token=token, ) cancelled += 1 logger.info( "Cancelled eval job %s (stage %s) before delete", job.id, stage, ) except Exception as e: # noqa: BLE001 - cancel is best-effort logger.warning( "cancel_job(%s) failed (%s: %s); deleting row anyway", job.id, type(e).__name__, e, ) return cancelled def stop_and_delete_rows(submission_ids: Iterable[str]) -> None: """Cancel any running eval Job(s) for the listed rows, then delete them. The "stop" step (:func:`_cancel_jobs_for_submissions`) is best-effort and never raises; the "delete" step is the existing :func:`delete_rows` (artifacts then row). So this is exactly "stop if needed, then delete", and it is the right action for a stuck/pending submission whose GPU job is still in flight. Raises: ValueError: no ids were given. """ ids = _clean_id_set(submission_ids) cancelled = _cancel_jobs_for_submissions(ids) logger.info( "stop_and_delete: cancelled %d job(s) for %d submission(s)", cancelled, len(ids), ) delete_rows(ids) def _raise_for_missing(requested: set[str], seen: set[str]) -> None: """Raise ``LookupError`` if any requested id was not found in the rows.""" missing = requested - seen if missing: raise LookupError( f"submission_id(s) not in results.jsonl: {', '.join(sorted(missing))}." ) def promote_row(submission_id: str, method: str) -> None: """Single-row convenience wrapper over :func:`promote_rows`.""" promote_rows([submission_id], method) def demote_row(submission_id: str) -> None: """Single-row convenience wrapper over :func:`demote_rows`.""" demote_rows([submission_id])