CADGenBench / admin.py
Michael Rabinovich
Rename fixture-named result keys to sample-named keys
be6fa3d
# Copyright 2026 Hugging Face
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Admin-tab handlers for the CADGenBench leaderboard Space.
Bundle 7: promote a row into the validated tier (recording the evidence
type) and demote it back. Gating is the ``CADGENBENCH_ADMINS`` Space
variable (comma-separated HF usernames); :func:`is_admin` is the single
predicate the UI uses to enable or disable the controls.
Row writes go through :func:`submit._hub_rmw_results`, so the
``_HUB_LOCK`` + read-modify-write semantics match the submit path
exactly. There is no second writer of ``results.jsonl`` with its own
locking story.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from typing import Any, Iterable
import gradio as gr
from huggingface_hub import cancel_job, list_jobs
from huggingface_hub.errors import EntryNotFoundError
from leaderboard import HF_RENDER_BUCKET, render_submission_prefix
from submit import (
EVAL_JOB_NAMESPACE,
HF_SUBMISSIONS_REPO,
REPORTS_DIR,
SUBMISSIONS_DIR,
_HF_API,
_download_results_jsonl,
_hub_rmw_results,
_spawn_worker,
)
logger = logging.getLogger(__name__)
ADMINS_ENV = "CADGENBENCH_ADMINS"
# HF Job stages that are already finished: cancelling one is a no-op (and
# usually an error), so the stop step skips them. Mirrors
# huggingface_hub.JobStage; kept as plain strings so a new terminal
# stage name added upstream doesn't import-break this module.
_JOB_TERMINAL_STAGES: frozenset[str] = frozenset(
{"COMPLETED", "ERROR", "CANCELED", "DELETED"}
)
# The evidence types accepted on promotion. Mirrors the
# `validation_method` enum in cadgenbench-submissions/schema.md and the
# validation policy doc.
VALID_METHODS: tuple[str, ...] = ("code", "traces", "api", "manual")
# Score-shaped fields cleared when a row is flipped back to ``pending``
# for a rescore. Mirrors the pending regime in
# cadgenbench-submissions/schema.md: every aggregate is ``null`` until
# the fresh eval flips the row back to ``completed``. ``submitted_at``
# is intentionally *not* touched -- the schema defines it as the
# immutable timestamp the row was first written, so a rescore preserves
# the original submit provenance.
_RESCORE_CLEARED_SCORE_FIELDS: tuple[str, ...] = (
"aggregate_score",
"validity_rate",
"score_by_task_type",
"per_task_scores",
"per_sample_scores",
"per_sample_breakdown",
)
# Gap between successive worker dispatches in a bulk rescore. Each
# worker dispatches its own HF Job and then polls; staggering the
# starts keeps a rescore-all from firing N ``run_job`` control-plane
# calls in one burst (which can rate-limit) while HF's own queue
# absorbs anything past the account's concurrent-slot cap. Small enough
# to be invisible for a one-or-two-row rescore.
RESCORE_DISPATCH_STAGGER_SECONDS = 2.0
def admin_usernames() -> set[str]:
"""Parse ``CADGENBENCH_ADMINS`` into a set of HF usernames.
Comma-separated, whitespace-trimmed, empties dropped. Read fresh on
each call so flipping the Space variable takes effect without a code
deploy. Empty or unset yields an empty set, which means no one is an
admin and the controls stay inert.
"""
raw = os.environ.get(ADMINS_ENV, "")
return {part.strip() for part in raw.split(",") if part.strip()}
def is_admin(profile: gr.OAuthProfile | None) -> bool:
"""Return whether *profile* is a logged-in user in the admin set.
Logged-out users (``profile is None``) are never admins. With an
empty admin set no profile qualifies, so the admin controls remain
disabled for everyone until ``CADGENBENCH_ADMINS`` is populated.
"""
if profile is None:
return False
return profile.username in admin_usernames()
def _clean_id_set(submission_ids: Iterable[str]) -> set[str]:
"""Normalise an id iterable to a non-empty set, else raise.
Guards every bulk helper: a no-op call (nothing selected) is a
caller error, surfaced as ``ValueError`` rather than a silent
empty write.
"""
ids = {str(s) for s in submission_ids if s}
if not ids:
raise ValueError("No submissions selected.")
return ids
def promote_rows(submission_ids: Iterable[str], method: str) -> None:
"""Move every listed row into the validated tier with *method*.
One ``results.jsonl`` write for the whole batch. Idempotent on rows
already validated (their method is set to *method*).
Raises:
ValueError: *method* is unknown, or no ids were given.
LookupError: one or more ids are absent from ``results.jsonl``
(no partial write happens; the helper raises inside the
read-modify-write before the upload).
"""
if method not in VALID_METHODS:
raise ValueError(
f"Unknown validation_method {method!r}; expected one of "
f"{', '.join(VALID_METHODS)}."
)
ids = _clean_id_set(submission_ids)
def mutate(rows: list[dict[str, Any]]) -> None:
seen = set()
for row in rows:
if row.get("submission_id") in ids:
row["validation_status"] = "validated"
row["validation_method"] = method
seen.add(row["submission_id"])
_raise_for_missing(ids, seen)
_hub_rmw_results(
mutate,
commit_message=f"promote {len(ids)} row(s) to validated ({method})",
)
def demote_rows(submission_ids: Iterable[str]) -> None:
"""Return every listed row to the unvalidated tier, clearing method.
One ``results.jsonl`` write for the whole batch. Idempotent on rows
already unvalidated.
Raises:
ValueError: no ids were given.
LookupError: one or more ids are absent from ``results.jsonl``.
"""
ids = _clean_id_set(submission_ids)
def mutate(rows: list[dict[str, Any]]) -> None:
seen = set()
for row in rows:
if row.get("submission_id") in ids:
row["validation_status"] = "unvalidated"
row["validation_method"] = None
seen.add(row["submission_id"])
_raise_for_missing(ids, seen)
_hub_rmw_results(
mutate,
commit_message=f"demote {len(ids)} row(s) to unvalidated",
)
def _current_fixture_names() -> list[str]:
"""Sorted fixture set of the *current* ``cadgenbench-data`` revision.
A rescore re-evaluates each stored zip against whatever the data
repo exposes now (the whole point after a GT swap), so the fixture
set comes from the live inputs dir rather than from whatever the
submission was originally scored against. This is the same source
:func:`submit._validate_fixture_set` checks new uploads against, so
the single-vs-sharded dispatch split matches the submit path.
"""
from cadgenbench.common.paths import data_inputs_dir
root = data_inputs_dir()
return sorted(p.name for p in root.iterdir() if p.is_dir())
def _dispatch_rescore_workers(
targets: dict[str, str], fixture_names: list[str],
) -> None:
"""Spawn one eval worker per target on a staggered background thread.
*targets* maps ``submission_id -> submission_blob_url``. Runs the
dispatch loop on its own daemon thread so the caller (a Gradio
handler) returns the moment the rows are flipped to pending, rather
than blocking while N workers are kicked off. Each worker is the
same fire-and-forget dispatch+poll thread the submit path uses, so
a rescore reuses the entire eval pipeline (sharding, render upload,
report regeneration, row flip) unchanged.
"""
items = list(targets.items())
def _run() -> None:
for i, (submission_id, blob_url) in enumerate(items):
if i:
time.sleep(RESCORE_DISPATCH_STAGGER_SECONDS)
try:
_spawn_worker(submission_id, blob_url, fixture_names)
except Exception as e: # noqa: BLE001 - one bad dispatch must not stall the rest
logger.exception(
"rescore: failed to spawn worker for %s (%s: %s)",
submission_id, type(e).__name__, e,
)
threading.Thread(
target=_run, name="cgb-rescore-dispatch", daemon=True,
).start()
def _rescore(ids: set[str], *, require_found: bool) -> tuple[int, list[str]]:
"""Flip *ids* back to pending, then dispatch a fresh eval for each.
Single ``results.jsonl`` write resets every target row to the
pending regime (status ``pending``, ``failure_reason`` cleared, all
score fields nulled) and captures its stored ``submission_blob_url``;
a row with no stored zip (legacy seed rows) can't be rescored and is
collected as *skipped* instead. After the write commits, workers are
dispatched on a staggered background thread.
Idempotent and re-runnable: a rescore that's interrupted (Space
restart) leaves its in-flight rows pending, which the boot-time
stuck-pending sweep flips to failed, and re-running the rescore on
those rows converges. ``submitted_at`` is preserved (immutable per
the schema).
Args:
require_found: when True (selected-rows path) every id must
exist in ``results.jsonl`` or :class:`LookupError` is raised
before any worker is dispatched; when False (rescore-all
path) the id set was just derived from the file so a missing
id only means a concurrent delete and is ignored.
Returns:
``(dispatched_count, skipped_ids)`` -- how many workers were
queued and which ids were skipped for lacking a stored zip.
"""
captured: dict[str, str] = {}
skipped: set[str] = set()
def mutate(rows: list[dict[str, Any]]) -> None:
seen = set()
for row in rows:
sid = row.get("submission_id")
if sid not in ids:
continue
seen.add(sid)
blob_url = row.get("submission_blob_url")
if not blob_url:
skipped.add(sid)
continue
row["status"] = "pending"
row["failure_reason"] = None
for field in _RESCORE_CLEARED_SCORE_FIELDS:
row[field] = None
captured[sid] = blob_url
if require_found:
_raise_for_missing(ids, seen)
_hub_rmw_results(
mutate,
commit_message=f"rescore: reset {len(ids)} row(s) to pending",
)
if captured:
_dispatch_rescore_workers(captured, _current_fixture_names())
return len(captured), sorted(skipped)
def rescore_rows(submission_ids: Iterable[str]) -> tuple[int, list[str]]:
"""Re-evaluate every listed submission against the current data.
Resets each row to pending and re-dispatches the eval, which
re-renders the gallery, regenerates ``reports/<id>.{html,json}``,
and recomputes the scores. Use after a ground-truth or metric change
that invalidates existing scores.
Raises:
ValueError: no ids were given.
LookupError: one or more ids are absent from ``results.jsonl``
(no row is reset and no worker is dispatched).
Returns:
``(dispatched_count, skipped_ids)``; *skipped_ids* are rows that
have no stored zip to re-evaluate (legacy seed rows).
"""
ids = _clean_id_set(submission_ids)
return _rescore(ids, require_found=True)
def _rescoreable_ids_from_hub() -> set[str]:
"""Every submission_id with a stored zip that isn't mid-eval.
Reads the live ``results.jsonl`` and returns the ids eligible for a
bulk rescore: a row needs a ``submission_blob_url`` (so there's a
zip to re-evaluate) and must not already be ``pending`` (skipping
in-flight evals avoids double-dispatching a row a worker is already
driving). Completed and failed rows both qualify.
"""
body = _download_results_jsonl()
ids: set[str] = set()
for line in body.splitlines():
if not line.strip():
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
if not row.get("submission_blob_url"):
continue
if row.get("status") == "pending":
continue
sid = row.get("submission_id")
if sid:
ids.add(sid)
return ids
def rescore_all() -> tuple[int, list[str]]:
"""Re-evaluate every rescoreable submission (see :func:`rescore_rows`).
Targets every row with a stored zip that isn't already pending. This
is the heavy, board-wide action a maintainer runs after a GT swap.
Raises:
ValueError: nothing is rescoreable (empty board, or every row is
pending / lacks a stored zip).
Returns:
``(dispatched_count, skipped_ids)``.
"""
ids = _rescoreable_ids_from_hub()
if not ids:
raise ValueError(
"No rescoreable submissions (every row is pending or has no "
"stored zip)."
)
return _rescore(ids, require_found=False)
def delete_rows(submission_ids: Iterable[str]) -> None:
"""Permanently delete every listed submission: artifacts then row.
Irreversible. For each id, best-effort deletes the companion blobs
(``submissions/<id>.zip``, ``reports/<id>.{html,json}``) and then
drops the row from ``results.jsonl`` in a single write. A blob that
does not exist is skipped (a failed / pending row may never have
had a report). Missing ``results.jsonl`` rows are tolerated too, so
a re-run after a partial failure still converges.
Raises:
ValueError: no ids were given.
"""
ids = _clean_id_set(submission_ids)
for sid in sorted(ids):
for path in (
f"{SUBMISSIONS_DIR}/{sid}.zip",
f"{REPORTS_DIR}/{sid}.html",
f"{REPORTS_DIR}/{sid}.json",
):
try:
_HF_API.delete_file(
path_in_repo=path,
repo_id=HF_SUBMISSIONS_REPO,
repo_type="dataset",
commit_message=f"delete artifact {path}",
)
except EntryNotFoundError:
pass
except Exception as e: # noqa: BLE001 - keep deleting the rest
logger.warning(
"Failed to delete artifact %s (%s: %s)",
path, type(e).__name__, e,
)
_delete_bucket_renders(sid)
def mutate(rows: list[dict[str, Any]]) -> None:
rows[:] = [r for r in rows if r.get("submission_id") not in ids]
_hub_rmw_results(
mutate, commit_message=f"delete {len(ids)} submission(s)",
)
def _delete_bucket_renders(submission_id: str) -> None:
"""Delete every render for *submission_id* from the public render bucket.
The renders live under ``renders/<id>/`` in the bucket (uploaded by the eval
job). ``batch_bucket_files`` has no recursive prefix delete, so we list the
prefix and delete the files in one batch. Best-effort: a bucket failure is
logged, never blocks the row deletion (mirrors the dataset-artifact path).
"""
prefix = render_submission_prefix(submission_id)
try:
paths = [
entry.path
for entry in _HF_API.list_bucket_tree(
HF_RENDER_BUCKET, prefix=prefix, recursive=True,
)
if getattr(entry, "path", None) and not entry.path.endswith("/")
]
if paths:
_HF_API.batch_bucket_files(HF_RENDER_BUCKET, delete=paths)
logger.info(
"Deleted %d render(s) under %s from bucket %s",
len(paths), prefix, HF_RENDER_BUCKET,
)
except Exception as e: # noqa: BLE001 - bucket failure must not block delete
logger.warning(
"Failed to delete bucket renders under %s (%s: %s)",
prefix, type(e).__name__, e,
)
def _cancel_jobs_for_submissions(ids: set[str]) -> int:
"""Best-effort cancel every non-terminal eval Job for one of *ids*.
Each eval Job is dispatched with its ``submission_id`` baked into the
command argv (see :func:`submit._dispatch_eval_command`), so there's
no need to persist a ``job_id`` on the row: we list the eval
account's jobs and cancel any still-running one whose command
mentions a target id. This also catches a submission's shard jobs,
since each shard carries the same id in its command.
Never raises. A job that already finished, a listing failure, or a
cancel race must not block the row delete that follows -- the GPU job
carries its own ``--timeout`` and self-reaps if a cancel is missed.
Returns the count of cancel calls that succeeded (for logging only).
"""
token = os.environ.get("HF_TOKEN")
try:
jobs = list_jobs(namespace=EVAL_JOB_NAMESPACE, token=token)
except Exception as e: # noqa: BLE001 - listing is best-effort
logger.warning(
"list_jobs(%s) failed (%s: %s); skipping job cancel, deleting rows",
EVAL_JOB_NAMESPACE, type(e).__name__, e,
)
return 0
cancelled = 0
for job in jobs:
stage = getattr(getattr(job, "status", None), "stage", None)
if stage in _JOB_TERMINAL_STAGES:
continue
argv = list(getattr(job, "command", None) or []) + list(
getattr(job, "arguments", None) or []
)
if not any(sid in argv for sid in ids):
continue
try:
cancel_job(
job_id=job.id, namespace=EVAL_JOB_NAMESPACE, token=token,
)
cancelled += 1
logger.info(
"Cancelled eval job %s (stage %s) before delete", job.id, stage,
)
except Exception as e: # noqa: BLE001 - cancel is best-effort
logger.warning(
"cancel_job(%s) failed (%s: %s); deleting row anyway",
job.id, type(e).__name__, e,
)
return cancelled
def stop_and_delete_rows(submission_ids: Iterable[str]) -> None:
"""Cancel any running eval Job(s) for the listed rows, then delete them.
The "stop" step (:func:`_cancel_jobs_for_submissions`) is
best-effort and never raises; the "delete" step is the existing
:func:`delete_rows` (artifacts then row). So this is exactly "stop if
needed, then delete", and it is the right action for a stuck/pending
submission whose GPU job is still in flight.
Raises:
ValueError: no ids were given.
"""
ids = _clean_id_set(submission_ids)
cancelled = _cancel_jobs_for_submissions(ids)
logger.info(
"stop_and_delete: cancelled %d job(s) for %d submission(s)",
cancelled, len(ids),
)
delete_rows(ids)
def _raise_for_missing(requested: set[str], seen: set[str]) -> None:
"""Raise ``LookupError`` if any requested id was not found in the rows."""
missing = requested - seen
if missing:
raise LookupError(
f"submission_id(s) not in results.jsonl: {', '.join(sorted(missing))}."
)
def promote_row(submission_id: str, method: str) -> None:
"""Single-row convenience wrapper over :func:`promote_rows`."""
promote_rows([submission_id], method)
def demote_row(submission_id: str) -> None:
"""Single-row convenience wrapper over :func:`demote_rows`."""
demote_rows([submission_id])