cadgenbench-leaderboard / tests /test_admin.py
Michael Rabinovich
leaderboard: admin rescore (selected + all) reusing the eval pipeline
2893b22
"""Unit tests for the admin promote / demote helpers.
C5 contract: the admin gate distinguishes in-set from out-of-set
users, ``promote_row`` and ``demote_row`` flip the two validation
fields on the right row, and a repeat promotion is idempotent. Every
``results.jsonl`` read and write is mocked, so the suite makes zero
Hub calls.
"""
from __future__ import annotations
import json
import os
from types import SimpleNamespace
import pytest
# submit.py kicks off a Hub-touching stuck-pending sweep at import.
# Disable it before importing (admin imports submit) so running this
# file in isolation stays offline.
os.environ.setdefault("CADGENBENCH_DISABLE_BOOT_SWEEP", "1")
import admin # noqa: E402
import submit # noqa: E402
SEED_ROWS = [
{
"submission_id": "alpha",
"validation_status": "unvalidated",
"validation_method": None,
"aggregate_score": 0.5,
},
{
"submission_id": "beta",
"validation_status": "validated",
"validation_method": "code",
"aggregate_score": 0.9,
},
]
def _jsonl(rows: list[dict]) -> str:
return "\n".join(json.dumps(r) for r in rows) + "\n"
def _row(rows: list[dict], submission_id: str) -> dict:
return next(r for r in rows if r["submission_id"] == submission_id)
@pytest.fixture
def hub(monkeypatch):
"""Mock the results.jsonl read + write that ``_hub_rmw_results`` drives.
``state["rows"]`` starts as a copy of :data:`SEED_ROWS` and is
replaced by whatever bytes the helper hands to ``upload_file``,
re-parsed back into dicts. ``state["uploads"]`` counts the writes
so a test can assert how many commits a call produced.
"""
state: dict = {
"rows": [dict(r) for r in SEED_ROWS],
"uploads": 0,
"deleted_paths": [],
"bucket_listed_prefixes": [],
"bucket_deleted_paths": [],
}
def fake_download() -> str:
return _jsonl(state["rows"])
def fake_upload(*, path_or_fileobj, **kwargs) -> None:
body = (
path_or_fileobj.decode("utf-8")
if isinstance(path_or_fileobj, bytes)
else path_or_fileobj
)
state["rows"] = [
json.loads(line) for line in body.splitlines() if line.strip()
]
state["uploads"] += 1
def fake_delete_file(*, path_in_repo, **kwargs) -> None:
state["deleted_paths"].append(path_in_repo)
def fake_list_bucket_tree(bucket_id, *, prefix, recursive=False, **kwargs):
state["bucket_listed_prefixes"].append(prefix)
return [SimpleNamespace(path=f"{prefix}/101/rotating.webp")]
def fake_batch_bucket_files(bucket_id, *, add=None, delete=None, **kwargs):
state["bucket_deleted_paths"].extend(delete or [])
monkeypatch.setattr(submit, "_download_results_jsonl", fake_download)
# admin.py imported `_download_results_jsonl` by name (used directly by
# `rescore_all`), so patch that binding too; the RMW path reaches the
# submit-module reference patched above.
monkeypatch.setattr(admin, "_download_results_jsonl", fake_download)
monkeypatch.setattr(submit._HF_API, "upload_file", fake_upload)
monkeypatch.setattr(submit._HF_API, "delete_file", fake_delete_file)
monkeypatch.setattr(submit._HF_API, "list_bucket_tree", fake_list_bucket_tree)
monkeypatch.setattr(submit._HF_API, "batch_bucket_files", fake_batch_bucket_files)
return state
def test_admin_gate_in_set_vs_out_of_set(monkeypatch):
"""is_admin admits logged-in users in the set, rejects everyone else."""
monkeypatch.setenv("CADGENBENCH_ADMINS", "michaelr27, lvwerra")
assert admin.is_admin(SimpleNamespace(username="michaelr27")) is True
assert admin.is_admin(SimpleNamespace(username="lvwerra")) is True
assert admin.is_admin(SimpleNamespace(username="someone-else")) is False
# Logged-out is never admin.
assert admin.is_admin(None) is False
# Empty / unset variable means no one is admin.
monkeypatch.delenv("CADGENBENCH_ADMINS", raising=False)
assert admin.is_admin(SimpleNamespace(username="michaelr27")) is False
def test_promote_happy_path(hub):
"""Promoting an unvalidated row sets status + method, leaves others alone."""
admin.promote_row("alpha", "manual")
promoted = _row(hub["rows"], "alpha")
assert promoted["validation_status"] == "validated"
assert promoted["validation_method"] == "manual"
# The other row is untouched.
assert _row(hub["rows"], "beta")["validation_method"] == "code"
assert hub["uploads"] == 1
def test_demote_happy_path(hub):
"""Demoting a validated row clears the method and flips status back."""
admin.demote_row("beta")
demoted = _row(hub["rows"], "beta")
assert demoted["validation_status"] == "unvalidated"
assert demoted["validation_method"] is None
assert hub["uploads"] == 1
def test_promote_idempotent(hub):
"""Re-promoting an already-validated row lands the same state."""
admin.promote_row("beta", "code")
once = dict(_row(hub["rows"], "beta"))
assert once["validation_status"] == "validated"
assert once["validation_method"] == "code"
# Second identical promotion produces an identical row.
admin.promote_row("beta", "code")
assert _row(hub["rows"], "beta") == once
def test_promote_rows_bulk(hub):
"""A bulk promote flips every listed row in one write."""
admin.promote_rows(["alpha", "beta"], "traces")
for sid in ("alpha", "beta"):
row = _row(hub["rows"], sid)
assert row["validation_status"] == "validated"
assert row["validation_method"] == "traces"
assert hub["uploads"] == 1
def test_demote_rows_bulk(hub):
"""A bulk demote clears method on every listed row in one write."""
admin.demote_rows(["alpha", "beta"])
for sid in ("alpha", "beta"):
row = _row(hub["rows"], sid)
assert row["validation_status"] == "unvalidated"
assert row["validation_method"] is None
assert hub["uploads"] == 1
def test_promote_rows_missing_id_raises_without_write(hub):
"""An unknown id aborts the whole batch before any upload."""
with pytest.raises(LookupError):
admin.promote_rows(["alpha", "ghost"], "code")
assert hub["uploads"] == 0
# alpha is untouched since the write never happened.
assert _row(hub["rows"], "alpha")["validation_status"] == "unvalidated"
def test_empty_selection_raises(hub):
"""Bulk helpers reject an empty / all-falsy selection."""
for call in (
lambda: admin.promote_rows([], "code"),
lambda: admin.demote_rows([None, ""]),
lambda: admin.delete_rows([]),
):
with pytest.raises(ValueError):
call()
assert hub["uploads"] == 0
def test_delete_rows_removes_rows_and_artifacts(hub):
"""Delete drops the rows and best-effort removes their artifacts."""
admin.delete_rows(["alpha"])
remaining = {r["submission_id"] for r in hub["rows"]}
assert remaining == {"beta"}
# All three companion blobs were targeted for deletion.
assert hub["deleted_paths"] == [
"submissions/alpha.zip",
"reports/alpha.html",
"reports/alpha.json",
]
# Renders for the deleted submission are purged from the public bucket too.
assert hub["bucket_listed_prefixes"] == ["renders/alpha"]
assert hub["bucket_deleted_paths"] == ["renders/alpha/101/rotating.webp"]
assert hub["uploads"] == 1
def _job(job_id: str, stage: str, *args: str) -> SimpleNamespace:
"""A minimal JobInfo stand-in: id, status.stage, and a command argv."""
return SimpleNamespace(
id=job_id,
status=SimpleNamespace(stage=stage, message=None),
command=["python", "/opt/eval_job.py", *args],
arguments=None,
)
@pytest.fixture
def jobs(monkeypatch):
"""Mock the Jobs API (``list_jobs`` / ``cancel_job``) admin imports.
``state["jobs"]`` is the list ``list_jobs`` returns;
``state["cancelled"]`` records every ``job_id`` a ``cancel_job`` call
targeted, so a test can assert exactly which jobs were stopped.
"""
state: dict = {"jobs": [], "cancelled": []}
def fake_list_jobs(*, namespace=None, token=None):
return state["jobs"]
def fake_cancel_job(*, job_id, namespace=None, token=None):
state["cancelled"].append(job_id)
monkeypatch.setattr(admin, "list_jobs", fake_list_jobs)
monkeypatch.setattr(admin, "cancel_job", fake_cancel_job)
return state
def test_stop_and_delete_cancels_running_then_deletes(hub, jobs):
"""A running job whose command names the id is cancelled, then the row goes."""
jobs["jobs"] = [
_job("job-alpha", "RUNNING", "alpha", "https://blob/alpha.zip"),
_job("job-beta", "RUNNING", "beta", "https://blob/beta.zip"),
]
admin.stop_and_delete_rows(["alpha"])
# Only alpha's job was cancelled.
assert jobs["cancelled"] == ["job-alpha"]
# And alpha's row + artifacts are gone, beta untouched.
assert {r["submission_id"] for r in hub["rows"]} == {"beta"}
assert "submissions/alpha.zip" in hub["deleted_paths"]
def test_stop_and_delete_catches_all_shard_jobs(hub, jobs):
"""Every shard job for a submission (same id in argv) is cancelled."""
jobs["jobs"] = [
_job("job-a0", "RUNNING", "alpha", "url", "--shard-id", "shard_000"),
_job("job-a1", "RUNNING", "alpha", "url", "--shard-id", "shard_001"),
]
admin.stop_and_delete_rows(["alpha"])
assert sorted(jobs["cancelled"]) == ["job-a0", "job-a1"]
def test_stop_and_delete_skips_terminal_jobs(hub, jobs):
"""A finished job for the id is not cancelled, but the row still deletes."""
jobs["jobs"] = [
_job("job-alpha", "COMPLETED", "alpha", "url"),
]
admin.stop_and_delete_rows(["alpha"])
assert jobs["cancelled"] == []
assert {r["submission_id"] for r in hub["rows"]} == {"beta"}
def test_stop_and_delete_tolerates_list_jobs_failure(hub, monkeypatch):
"""A Jobs-API listing failure must not block the row delete."""
def boom(*, namespace=None, token=None):
raise RuntimeError("jobs API down")
monkeypatch.setattr(admin, "list_jobs", boom)
admin.stop_and_delete_rows(["alpha"])
assert {r["submission_id"] for r in hub["rows"]} == {"beta"}
def test_stop_and_delete_tolerates_cancel_failure(hub, jobs, monkeypatch):
"""A cancel that errors is swallowed; the row still deletes."""
jobs["jobs"] = [_job("job-alpha", "RUNNING", "alpha", "url")]
def boom(*, job_id, namespace=None, token=None):
raise RuntimeError("cancel rejected")
monkeypatch.setattr(admin, "cancel_job", boom)
admin.stop_and_delete_rows(["alpha"])
assert {r["submission_id"] for r in hub["rows"]} == {"beta"}
def test_stop_and_delete_empty_selection_raises(hub, jobs):
"""An empty selection is a caller error, before any job/list work."""
with pytest.raises(ValueError):
admin.stop_and_delete_rows([])
assert jobs["cancelled"] == []
assert hub["uploads"] == 0
# --- Rescore -------------------------------------------------------------
# Rows shaped for the rescore path: a completed row with scores + a stored
# zip, a failed row with a zip, a legacy row with no zip, and a pending row
# (mid-eval). ``submitted_at`` is set so a test can assert it's preserved.
RESCORE_ROWS = [
{
"submission_id": "done",
"status": "completed",
"failure_reason": None,
"submission_blob_url": "https://blob/done.zip",
"submitted_at": "2026-01-01T00:00:00Z",
"aggregate_score": 0.7,
"validity_rate": 1.0,
"score_by_task_type": {"generation": 0.7},
"per_task_scores": {"generation": {"score": 0.7}},
"per_fixture_scores": {"f1": {"cad_score": 0.7}},
"per_fixture_breakdown": {"f1": {"validity": 1.0}},
},
{
"submission_id": "broke",
"status": "failed",
"failure_reason": "boom",
"submission_blob_url": "https://blob/broke.zip",
"submitted_at": "2026-01-02T00:00:00Z",
"aggregate_score": None,
},
{
"submission_id": "legacy",
"status": "completed",
"submission_blob_url": None,
"submitted_at": "2025-01-01T00:00:00Z",
"aggregate_score": 0.4,
},
{
"submission_id": "inflight",
"status": "pending",
"submission_blob_url": "https://blob/inflight.zip",
"submitted_at": "2026-02-01T00:00:00Z",
"aggregate_score": None,
},
]
@pytest.fixture
def dispatch(monkeypatch):
"""Capture rescore dispatch without spawning real workers/threads.
Replaces ``_dispatch_rescore_workers`` (which would start a daemon
thread that calls the submit path's ``_spawn_worker``) with a synch-
ronous recorder, and stubs the fixture-set lookup so the suite never
touches the data repo. ``state["targets"]`` is the ``{id: blob_url}``
map handed to dispatch; ``state["fixtures"]`` the fixture list.
"""
state: dict = {"targets": None, "fixtures": None, "calls": 0}
def fake_dispatch(targets, fixture_names):
state["targets"] = dict(targets)
state["fixtures"] = list(fixture_names)
state["calls"] += 1
monkeypatch.setattr(admin, "_dispatch_rescore_workers", fake_dispatch)
monkeypatch.setattr(admin, "_current_fixture_names", lambda: ["f1", "f2"])
return state
def test_rescore_rows_flips_to_pending_and_dispatches(hub, dispatch):
"""Rescore resets the row to the pending regime and queues a worker."""
hub["rows"] = [dict(r) for r in RESCORE_ROWS]
dispatched, skipped = admin.rescore_rows(["done"])
assert dispatched == 1
assert skipped == []
row = _row(hub["rows"], "done")
assert row["status"] == "pending"
assert row["failure_reason"] is None
# Every score-shaped field is cleared.
for field in admin._RESCORE_CLEARED_SCORE_FIELDS:
assert row[field] is None
# submitted_at is immutable provenance and must survive untouched.
assert row["submitted_at"] == "2026-01-01T00:00:00Z"
# The worker was queued with the stored zip url and current fixtures.
assert dispatch["targets"] == {"done": "https://blob/done.zip"}
assert dispatch["fixtures"] == ["f1", "f2"]
assert hub["uploads"] == 1
def test_rescore_rows_skips_rows_without_zip(hub, dispatch):
"""A legacy row with no stored zip is skipped, not dispatched or erroring."""
hub["rows"] = [dict(r) for r in RESCORE_ROWS]
dispatched, skipped = admin.rescore_rows(["legacy"])
assert dispatched == 0
assert skipped == ["legacy"]
# The row is left exactly as-is (still completed, score intact).
row = _row(hub["rows"], "legacy")
assert row["status"] == "completed"
assert row["aggregate_score"] == 0.4
# Nothing to dispatch.
assert dispatch["calls"] == 0
# The reset write still happens (single RMW), but flips nothing here.
assert hub["uploads"] == 1
def test_rescore_rows_missing_id_raises_without_dispatch(hub, dispatch):
"""An unknown id aborts the batch before any worker is queued."""
hub["rows"] = [dict(r) for r in RESCORE_ROWS]
with pytest.raises(LookupError):
admin.rescore_rows(["done", "ghost"])
# The mutate raised inside the RMW, so no row was flipped and no
# dispatch happened.
assert _row(hub["rows"], "done")["status"] == "completed"
assert dispatch["calls"] == 0
def test_rescore_rows_empty_selection_raises(hub, dispatch):
"""An empty selection is a caller error."""
with pytest.raises(ValueError):
admin.rescore_rows([])
assert dispatch["calls"] == 0
def test_rescore_all_targets_completed_and_failed_only(hub, dispatch):
"""Rescore-all hits rows with a zip, skipping pending + zip-less rows."""
hub["rows"] = [dict(r) for r in RESCORE_ROWS]
dispatched, skipped = admin.rescore_all()
# done + broke have zips and aren't pending; legacy has no zip;
# inflight is pending (mid-eval) -> neither dispatched.
assert dispatched == 2
assert set(dispatch["targets"]) == {"done", "broke"}
# Both targeted rows are now pending.
assert _row(hub["rows"], "done")["status"] == "pending"
assert _row(hub["rows"], "broke")["status"] == "pending"
# The pending in-flight row is left strictly alone.
assert _row(hub["rows"], "inflight")["status"] == "pending"
# The legacy row keeps its old completed score.
assert _row(hub["rows"], "legacy")["status"] == "completed"
def test_rescore_all_empty_board_raises(hub, dispatch):
"""Rescore-all with nothing rescoreable is a no-op error, no write."""
hub["rows"] = [
{"submission_id": "inflight", "status": "pending",
"submission_blob_url": "https://blob/x.zip"},
{"submission_id": "legacy", "status": "completed",
"submission_blob_url": None},
]
with pytest.raises(ValueError):
admin.rescore_all()
assert dispatch["calls"] == 0
assert hub["uploads"] == 0