Michael Rabinovich
leaderboard: serve renders from the public bucket, not the dataset proxy
d2161b1
# Copyright 2026 Hugging Face
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Admin-tab handlers for the CADGenBench leaderboard Space.
Bundle 7: promote a row into the validated tier (recording the evidence
type) and demote it back. Gating is the ``CADGENBENCH_ADMINS`` Space
variable (comma-separated HF usernames); :func:`is_admin` is the single
predicate the UI uses to enable or disable the controls.
Row writes go through :func:`submit._hub_rmw_results`, so the
``_HUB_LOCK`` + read-modify-write semantics match the submit path
exactly. There is no second writer of ``results.jsonl`` with its own
locking story.
"""
from __future__ import annotations
import logging
import os
from typing import Any, Iterable
import gradio as gr
from huggingface_hub import cancel_job, list_jobs
from huggingface_hub.errors import EntryNotFoundError
from leaderboard import HF_RENDER_BUCKET, render_submission_prefix
from submit import (
EVAL_JOB_NAMESPACE,
HF_SUBMISSIONS_REPO,
REPORTS_DIR,
SUBMISSIONS_DIR,
_HF_API,
_hub_rmw_results,
)
logger = logging.getLogger(__name__)
ADMINS_ENV = "CADGENBENCH_ADMINS"
# HF Job stages that are already finished: cancelling one is a no-op (and
# usually an error), so the stop step skips them. Mirrors
# huggingface_hub.JobStage; kept as plain strings so a new terminal
# stage name added upstream doesn't import-break this module.
_JOB_TERMINAL_STAGES: frozenset[str] = frozenset(
{"COMPLETED", "ERROR", "CANCELED", "DELETED"}
)
# The evidence types accepted on promotion. Mirrors the
# `validation_method` enum in cadgenbench-submissions/schema.md and the
# validation policy doc.
VALID_METHODS: tuple[str, ...] = ("code", "traces", "api", "manual")
def admin_usernames() -> set[str]:
"""Parse ``CADGENBENCH_ADMINS`` into a set of HF usernames.
Comma-separated, whitespace-trimmed, empties dropped. Read fresh on
each call so flipping the Space variable takes effect without a code
deploy. Empty or unset yields an empty set, which means no one is an
admin and the controls stay inert.
"""
raw = os.environ.get(ADMINS_ENV, "")
return {part.strip() for part in raw.split(",") if part.strip()}
def is_admin(profile: gr.OAuthProfile | None) -> bool:
"""Return whether *profile* is a logged-in user in the admin set.
Logged-out users (``profile is None``) are never admins. With an
empty admin set no profile qualifies, so the admin controls remain
disabled for everyone until ``CADGENBENCH_ADMINS`` is populated.
"""
if profile is None:
return False
return profile.username in admin_usernames()
def _clean_id_set(submission_ids: Iterable[str]) -> set[str]:
"""Normalise an id iterable to a non-empty set, else raise.
Guards every bulk helper: a no-op call (nothing selected) is a
caller error, surfaced as ``ValueError`` rather than a silent
empty write.
"""
ids = {str(s) for s in submission_ids if s}
if not ids:
raise ValueError("No submissions selected.")
return ids
def promote_rows(submission_ids: Iterable[str], method: str) -> None:
"""Move every listed row into the validated tier with *method*.
One ``results.jsonl`` write for the whole batch. Idempotent on rows
already validated (their method is set to *method*).
Raises:
ValueError: *method* is unknown, or no ids were given.
LookupError: one or more ids are absent from ``results.jsonl``
(no partial write happens; the helper raises inside the
read-modify-write before the upload).
"""
if method not in VALID_METHODS:
raise ValueError(
f"Unknown validation_method {method!r}; expected one of "
f"{', '.join(VALID_METHODS)}."
)
ids = _clean_id_set(submission_ids)
def mutate(rows: list[dict[str, Any]]) -> None:
seen = set()
for row in rows:
if row.get("submission_id") in ids:
row["validation_status"] = "validated"
row["validation_method"] = method
seen.add(row["submission_id"])
_raise_for_missing(ids, seen)
_hub_rmw_results(
mutate,
commit_message=f"promote {len(ids)} row(s) to validated ({method})",
)
def demote_rows(submission_ids: Iterable[str]) -> None:
"""Return every listed row to the unvalidated tier, clearing method.
One ``results.jsonl`` write for the whole batch. Idempotent on rows
already unvalidated.
Raises:
ValueError: no ids were given.
LookupError: one or more ids are absent from ``results.jsonl``.
"""
ids = _clean_id_set(submission_ids)
def mutate(rows: list[dict[str, Any]]) -> None:
seen = set()
for row in rows:
if row.get("submission_id") in ids:
row["validation_status"] = "unvalidated"
row["validation_method"] = None
seen.add(row["submission_id"])
_raise_for_missing(ids, seen)
_hub_rmw_results(
mutate,
commit_message=f"demote {len(ids)} row(s) to unvalidated",
)
def delete_rows(submission_ids: Iterable[str]) -> None:
"""Permanently delete every listed submission: artifacts then row.
Irreversible. For each id, best-effort deletes the companion blobs
(``submissions/<id>.zip``, ``reports/<id>.{html,json}``) and then
drops the row from ``results.jsonl`` in a single write. A blob that
does not exist is skipped (a failed / pending row may never have
had a report). Missing ``results.jsonl`` rows are tolerated too, so
a re-run after a partial failure still converges.
Raises:
ValueError: no ids were given.
"""
ids = _clean_id_set(submission_ids)
for sid in sorted(ids):
for path in (
f"{SUBMISSIONS_DIR}/{sid}.zip",
f"{REPORTS_DIR}/{sid}.html",
f"{REPORTS_DIR}/{sid}.json",
):
try:
_HF_API.delete_file(
path_in_repo=path,
repo_id=HF_SUBMISSIONS_REPO,
repo_type="dataset",
commit_message=f"delete artifact {path}",
)
except EntryNotFoundError:
pass
except Exception as e: # noqa: BLE001 - keep deleting the rest
logger.warning(
"Failed to delete artifact %s (%s: %s)",
path, type(e).__name__, e,
)
_delete_bucket_renders(sid)
def mutate(rows: list[dict[str, Any]]) -> None:
rows[:] = [r for r in rows if r.get("submission_id") not in ids]
_hub_rmw_results(
mutate, commit_message=f"delete {len(ids)} submission(s)",
)
def _delete_bucket_renders(submission_id: str) -> None:
"""Delete every render for *submission_id* from the public render bucket.
The renders live under ``renders/<id>/`` in the bucket (uploaded by the eval
job). ``batch_bucket_files`` has no recursive prefix delete, so we list the
prefix and delete the files in one batch. Best-effort: a bucket failure is
logged, never blocks the row deletion (mirrors the dataset-artifact path).
"""
prefix = render_submission_prefix(submission_id)
try:
paths = [
entry.path
for entry in _HF_API.list_bucket_tree(
HF_RENDER_BUCKET, prefix=prefix, recursive=True,
)
if getattr(entry, "path", None) and not entry.path.endswith("/")
]
if paths:
_HF_API.batch_bucket_files(HF_RENDER_BUCKET, delete=paths)
logger.info(
"Deleted %d render(s) under %s from bucket %s",
len(paths), prefix, HF_RENDER_BUCKET,
)
except Exception as e: # noqa: BLE001 - bucket failure must not block delete
logger.warning(
"Failed to delete bucket renders under %s (%s: %s)",
prefix, type(e).__name__, e,
)
def _cancel_jobs_for_submissions(ids: set[str]) -> int:
"""Best-effort cancel every non-terminal eval Job for one of *ids*.
Each eval Job is dispatched with its ``submission_id`` baked into the
command argv (see :func:`submit._dispatch_eval_command`), so there's
no need to persist a ``job_id`` on the row: we list the eval
account's jobs and cancel any still-running one whose command
mentions a target id. This also catches a submission's shard jobs,
since each shard carries the same id in its command.
Never raises. A job that already finished, a listing failure, or a
cancel race must not block the row delete that follows -- the GPU job
carries its own ``--timeout`` and self-reaps if a cancel is missed.
Returns the count of cancel calls that succeeded (for logging only).
"""
token = os.environ.get("HF_TOKEN")
try:
jobs = list_jobs(namespace=EVAL_JOB_NAMESPACE, token=token)
except Exception as e: # noqa: BLE001 - listing is best-effort
logger.warning(
"list_jobs(%s) failed (%s: %s); skipping job cancel, deleting rows",
EVAL_JOB_NAMESPACE, type(e).__name__, e,
)
return 0
cancelled = 0
for job in jobs:
stage = getattr(getattr(job, "status", None), "stage", None)
if stage in _JOB_TERMINAL_STAGES:
continue
argv = list(getattr(job, "command", None) or []) + list(
getattr(job, "arguments", None) or []
)
if not any(sid in argv for sid in ids):
continue
try:
cancel_job(
job_id=job.id, namespace=EVAL_JOB_NAMESPACE, token=token,
)
cancelled += 1
logger.info(
"Cancelled eval job %s (stage %s) before delete", job.id, stage,
)
except Exception as e: # noqa: BLE001 - cancel is best-effort
logger.warning(
"cancel_job(%s) failed (%s: %s); deleting row anyway",
job.id, type(e).__name__, e,
)
return cancelled
def stop_and_delete_rows(submission_ids: Iterable[str]) -> None:
"""Cancel any running eval Job(s) for the listed rows, then delete them.
The "stop" step (:func:`_cancel_jobs_for_submissions`) is
best-effort and never raises; the "delete" step is the existing
:func:`delete_rows` (artifacts then row). So this is exactly "stop if
needed, then delete", and it is the right action for a stuck/pending
submission whose GPU job is still in flight.
Raises:
ValueError: no ids were given.
"""
ids = _clean_id_set(submission_ids)
cancelled = _cancel_jobs_for_submissions(ids)
logger.info(
"stop_and_delete: cancelled %d job(s) for %d submission(s)",
cancelled, len(ids),
)
delete_rows(ids)
def _raise_for_missing(requested: set[str], seen: set[str]) -> None:
"""Raise ``LookupError`` if any requested id was not found in the rows."""
missing = requested - seen
if missing:
raise LookupError(
f"submission_id(s) not in results.jsonl: {', '.join(sorted(missing))}."
)
def promote_row(submission_id: str, method: str) -> None:
"""Single-row convenience wrapper over :func:`promote_rows`."""
promote_rows([submission_id], method)
def demote_row(submission_id: str) -> None:
"""Single-row convenience wrapper over :func:`demote_rows`."""
demote_rows([submission_id])