"""Unit tests for the admin promote / demote helpers. C5 contract: the admin gate distinguishes in-set from out-of-set users, ``promote_row`` and ``demote_row`` flip the two validation fields on the right row, and a repeat promotion is idempotent. Every ``results.jsonl`` read and write is mocked, so the suite makes zero Hub calls. """ from __future__ import annotations import json import os from types import SimpleNamespace import pytest # submit.py kicks off a Hub-touching stuck-pending sweep at import. # Disable it before importing (admin imports submit) so running this # file in isolation stays offline. os.environ.setdefault("CADGENBENCH_DISABLE_BOOT_SWEEP", "1") import admin # noqa: E402 import submit # noqa: E402 SEED_ROWS = [ { "submission_id": "alpha", "validation_status": "unvalidated", "validation_method": None, "aggregate_score": 0.5, }, { "submission_id": "beta", "validation_status": "validated", "validation_method": "code", "aggregate_score": 0.9, }, ] def _jsonl(rows: list[dict]) -> str: return "\n".join(json.dumps(r) for r in rows) + "\n" def _row(rows: list[dict], submission_id: str) -> dict: return next(r for r in rows if r["submission_id"] == submission_id) @pytest.fixture def hub(monkeypatch): """Mock the results.jsonl read + write that ``_hub_rmw_results`` drives. ``state["rows"]`` starts as a copy of :data:`SEED_ROWS` and is replaced by whatever bytes the helper hands to ``upload_file``, re-parsed back into dicts. ``state["uploads"]`` counts the writes so a test can assert how many commits a call produced. """ state: dict = { "rows": [dict(r) for r in SEED_ROWS], "uploads": 0, "deleted_paths": [], "bucket_listed_prefixes": [], "bucket_deleted_paths": [], } def fake_download() -> str: return _jsonl(state["rows"]) def fake_upload(*, path_or_fileobj, **kwargs) -> None: body = ( path_or_fileobj.decode("utf-8") if isinstance(path_or_fileobj, bytes) else path_or_fileobj ) state["rows"] = [ json.loads(line) for line in body.splitlines() if line.strip() ] state["uploads"] += 1 def fake_delete_file(*, path_in_repo, **kwargs) -> None: state["deleted_paths"].append(path_in_repo) def fake_list_bucket_tree(bucket_id, *, prefix, recursive=False, **kwargs): state["bucket_listed_prefixes"].append(prefix) return [SimpleNamespace(path=f"{prefix}/101/rotating.webp")] def fake_batch_bucket_files(bucket_id, *, add=None, delete=None, **kwargs): state["bucket_deleted_paths"].extend(delete or []) monkeypatch.setattr(submit, "_download_results_jsonl", fake_download) # admin.py imported `_download_results_jsonl` by name (used directly by # `rescore_all`), so patch that binding too; the RMW path reaches the # submit-module reference patched above. monkeypatch.setattr(admin, "_download_results_jsonl", fake_download) monkeypatch.setattr(submit._HF_API, "upload_file", fake_upload) monkeypatch.setattr(submit._HF_API, "delete_file", fake_delete_file) monkeypatch.setattr(submit._HF_API, "list_bucket_tree", fake_list_bucket_tree) monkeypatch.setattr(submit._HF_API, "batch_bucket_files", fake_batch_bucket_files) return state def test_admin_gate_in_set_vs_out_of_set(monkeypatch): """is_admin admits logged-in users in the set, rejects everyone else.""" monkeypatch.setenv("CADGENBENCH_ADMINS", "michaelr27, lvwerra") assert admin.is_admin(SimpleNamespace(username="michaelr27")) is True assert admin.is_admin(SimpleNamespace(username="lvwerra")) is True assert admin.is_admin(SimpleNamespace(username="someone-else")) is False # Logged-out is never admin. assert admin.is_admin(None) is False # Empty / unset variable means no one is admin. monkeypatch.delenv("CADGENBENCH_ADMINS", raising=False) assert admin.is_admin(SimpleNamespace(username="michaelr27")) is False def test_promote_happy_path(hub): """Promoting an unvalidated row sets status + method, leaves others alone.""" admin.promote_row("alpha", "manual") promoted = _row(hub["rows"], "alpha") assert promoted["validation_status"] == "validated" assert promoted["validation_method"] == "manual" # The other row is untouched. assert _row(hub["rows"], "beta")["validation_method"] == "code" assert hub["uploads"] == 1 def test_demote_happy_path(hub): """Demoting a validated row clears the method and flips status back.""" admin.demote_row("beta") demoted = _row(hub["rows"], "beta") assert demoted["validation_status"] == "unvalidated" assert demoted["validation_method"] is None assert hub["uploads"] == 1 def test_promote_idempotent(hub): """Re-promoting an already-validated row lands the same state.""" admin.promote_row("beta", "code") once = dict(_row(hub["rows"], "beta")) assert once["validation_status"] == "validated" assert once["validation_method"] == "code" # Second identical promotion produces an identical row. admin.promote_row("beta", "code") assert _row(hub["rows"], "beta") == once def test_promote_rows_bulk(hub): """A bulk promote flips every listed row in one write.""" admin.promote_rows(["alpha", "beta"], "traces") for sid in ("alpha", "beta"): row = _row(hub["rows"], sid) assert row["validation_status"] == "validated" assert row["validation_method"] == "traces" assert hub["uploads"] == 1 def test_demote_rows_bulk(hub): """A bulk demote clears method on every listed row in one write.""" admin.demote_rows(["alpha", "beta"]) for sid in ("alpha", "beta"): row = _row(hub["rows"], sid) assert row["validation_status"] == "unvalidated" assert row["validation_method"] is None assert hub["uploads"] == 1 def test_promote_rows_missing_id_raises_without_write(hub): """An unknown id aborts the whole batch before any upload.""" with pytest.raises(LookupError): admin.promote_rows(["alpha", "ghost"], "code") assert hub["uploads"] == 0 # alpha is untouched since the write never happened. assert _row(hub["rows"], "alpha")["validation_status"] == "unvalidated" def test_empty_selection_raises(hub): """Bulk helpers reject an empty / all-falsy selection.""" for call in ( lambda: admin.promote_rows([], "code"), lambda: admin.demote_rows([None, ""]), lambda: admin.delete_rows([]), ): with pytest.raises(ValueError): call() assert hub["uploads"] == 0 def test_delete_rows_removes_rows_and_artifacts(hub): """Delete drops the rows and best-effort removes their artifacts.""" admin.delete_rows(["alpha"]) remaining = {r["submission_id"] for r in hub["rows"]} assert remaining == {"beta"} # All three companion blobs were targeted for deletion. assert hub["deleted_paths"] == [ "submissions/alpha.zip", "reports/alpha.html", "reports/alpha.json", ] # Renders for the deleted submission are purged from the public bucket too. assert hub["bucket_listed_prefixes"] == ["renders/alpha"] assert hub["bucket_deleted_paths"] == ["renders/alpha/101/rotating.webp"] assert hub["uploads"] == 1 def _job(job_id: str, stage: str, *args: str) -> SimpleNamespace: """A minimal JobInfo stand-in: id, status.stage, and a command argv.""" return SimpleNamespace( id=job_id, status=SimpleNamespace(stage=stage, message=None), command=["python", "/opt/eval_job.py", *args], arguments=None, ) @pytest.fixture def jobs(monkeypatch): """Mock the Jobs API (``list_jobs`` / ``cancel_job``) admin imports. ``state["jobs"]`` is the list ``list_jobs`` returns; ``state["cancelled"]`` records every ``job_id`` a ``cancel_job`` call targeted, so a test can assert exactly which jobs were stopped. """ state: dict = {"jobs": [], "cancelled": []} def fake_list_jobs(*, namespace=None, token=None): return state["jobs"] def fake_cancel_job(*, job_id, namespace=None, token=None): state["cancelled"].append(job_id) monkeypatch.setattr(admin, "list_jobs", fake_list_jobs) monkeypatch.setattr(admin, "cancel_job", fake_cancel_job) return state def test_stop_and_delete_cancels_running_then_deletes(hub, jobs): """A running job whose command names the id is cancelled, then the row goes.""" jobs["jobs"] = [ _job("job-alpha", "RUNNING", "alpha", "https://blob/alpha.zip"), _job("job-beta", "RUNNING", "beta", "https://blob/beta.zip"), ] admin.stop_and_delete_rows(["alpha"]) # Only alpha's job was cancelled. assert jobs["cancelled"] == ["job-alpha"] # And alpha's row + artifacts are gone, beta untouched. assert {r["submission_id"] for r in hub["rows"]} == {"beta"} assert "submissions/alpha.zip" in hub["deleted_paths"] def test_stop_and_delete_catches_all_shard_jobs(hub, jobs): """Every shard job for a submission (same id in argv) is cancelled.""" jobs["jobs"] = [ _job("job-a0", "RUNNING", "alpha", "url", "--shard-id", "shard_000"), _job("job-a1", "RUNNING", "alpha", "url", "--shard-id", "shard_001"), ] admin.stop_and_delete_rows(["alpha"]) assert sorted(jobs["cancelled"]) == ["job-a0", "job-a1"] def test_stop_and_delete_skips_terminal_jobs(hub, jobs): """A finished job for the id is not cancelled, but the row still deletes.""" jobs["jobs"] = [ _job("job-alpha", "COMPLETED", "alpha", "url"), ] admin.stop_and_delete_rows(["alpha"]) assert jobs["cancelled"] == [] assert {r["submission_id"] for r in hub["rows"]} == {"beta"} def test_stop_and_delete_tolerates_list_jobs_failure(hub, monkeypatch): """A Jobs-API listing failure must not block the row delete.""" def boom(*, namespace=None, token=None): raise RuntimeError("jobs API down") monkeypatch.setattr(admin, "list_jobs", boom) admin.stop_and_delete_rows(["alpha"]) assert {r["submission_id"] for r in hub["rows"]} == {"beta"} def test_stop_and_delete_tolerates_cancel_failure(hub, jobs, monkeypatch): """A cancel that errors is swallowed; the row still deletes.""" jobs["jobs"] = [_job("job-alpha", "RUNNING", "alpha", "url")] def boom(*, job_id, namespace=None, token=None): raise RuntimeError("cancel rejected") monkeypatch.setattr(admin, "cancel_job", boom) admin.stop_and_delete_rows(["alpha"]) assert {r["submission_id"] for r in hub["rows"]} == {"beta"} def test_stop_and_delete_empty_selection_raises(hub, jobs): """An empty selection is a caller error, before any job/list work.""" with pytest.raises(ValueError): admin.stop_and_delete_rows([]) assert jobs["cancelled"] == [] assert hub["uploads"] == 0 # --- Rescore ------------------------------------------------------------- # Rows shaped for the rescore path: a completed row with scores + a stored # zip, a failed row with a zip, a legacy row with no zip, and a pending row # (mid-eval). ``submitted_at`` is set so a test can assert it's preserved. RESCORE_ROWS = [ { "submission_id": "done", "status": "completed", "failure_reason": None, "submission_blob_url": "https://blob/done.zip", "submitted_at": "2026-01-01T00:00:00Z", "aggregate_score": 0.7, "validity_rate": 1.0, "score_by_task_type": {"generation": 0.7}, "per_task_scores": {"generation": {"score": 0.7}}, "per_fixture_scores": {"f1": {"cad_score": 0.7}}, "per_fixture_breakdown": {"f1": {"validity": 1.0}}, }, { "submission_id": "broke", "status": "failed", "failure_reason": "boom", "submission_blob_url": "https://blob/broke.zip", "submitted_at": "2026-01-02T00:00:00Z", "aggregate_score": None, }, { "submission_id": "legacy", "status": "completed", "submission_blob_url": None, "submitted_at": "2025-01-01T00:00:00Z", "aggregate_score": 0.4, }, { "submission_id": "inflight", "status": "pending", "submission_blob_url": "https://blob/inflight.zip", "submitted_at": "2026-02-01T00:00:00Z", "aggregate_score": None, }, ] @pytest.fixture def dispatch(monkeypatch): """Capture rescore dispatch without spawning real workers/threads. Replaces ``_dispatch_rescore_workers`` (which would start a daemon thread that calls the submit path's ``_spawn_worker``) with a synch- ronous recorder, and stubs the fixture-set lookup so the suite never touches the data repo. ``state["targets"]`` is the ``{id: blob_url}`` map handed to dispatch; ``state["fixtures"]`` the fixture list. """ state: dict = {"targets": None, "fixtures": None, "calls": 0} def fake_dispatch(targets, fixture_names): state["targets"] = dict(targets) state["fixtures"] = list(fixture_names) state["calls"] += 1 monkeypatch.setattr(admin, "_dispatch_rescore_workers", fake_dispatch) monkeypatch.setattr(admin, "_current_fixture_names", lambda: ["f1", "f2"]) return state def test_rescore_rows_flips_to_pending_and_dispatches(hub, dispatch): """Rescore resets the row to the pending regime and queues a worker.""" hub["rows"] = [dict(r) for r in RESCORE_ROWS] dispatched, skipped = admin.rescore_rows(["done"]) assert dispatched == 1 assert skipped == [] row = _row(hub["rows"], "done") assert row["status"] == "pending" assert row["failure_reason"] is None # Every score-shaped field is cleared. for field in admin._RESCORE_CLEARED_SCORE_FIELDS: assert row[field] is None # submitted_at is immutable provenance and must survive untouched. assert row["submitted_at"] == "2026-01-01T00:00:00Z" # The worker was queued with the stored zip url and current fixtures. assert dispatch["targets"] == {"done": "https://blob/done.zip"} assert dispatch["fixtures"] == ["f1", "f2"] assert hub["uploads"] == 1 def test_rescore_rows_skips_rows_without_zip(hub, dispatch): """A legacy row with no stored zip is skipped, not dispatched or erroring.""" hub["rows"] = [dict(r) for r in RESCORE_ROWS] dispatched, skipped = admin.rescore_rows(["legacy"]) assert dispatched == 0 assert skipped == ["legacy"] # The row is left exactly as-is (still completed, score intact). row = _row(hub["rows"], "legacy") assert row["status"] == "completed" assert row["aggregate_score"] == 0.4 # Nothing to dispatch. assert dispatch["calls"] == 0 # The reset write still happens (single RMW), but flips nothing here. assert hub["uploads"] == 1 def test_rescore_rows_missing_id_raises_without_dispatch(hub, dispatch): """An unknown id aborts the batch before any worker is queued.""" hub["rows"] = [dict(r) for r in RESCORE_ROWS] with pytest.raises(LookupError): admin.rescore_rows(["done", "ghost"]) # The mutate raised inside the RMW, so no row was flipped and no # dispatch happened. assert _row(hub["rows"], "done")["status"] == "completed" assert dispatch["calls"] == 0 def test_rescore_rows_empty_selection_raises(hub, dispatch): """An empty selection is a caller error.""" with pytest.raises(ValueError): admin.rescore_rows([]) assert dispatch["calls"] == 0 def test_rescore_all_targets_completed_and_failed_only(hub, dispatch): """Rescore-all hits rows with a zip, skipping pending + zip-less rows.""" hub["rows"] = [dict(r) for r in RESCORE_ROWS] dispatched, skipped = admin.rescore_all() # done + broke have zips and aren't pending; legacy has no zip; # inflight is pending (mid-eval) -> neither dispatched. assert dispatched == 2 assert set(dispatch["targets"]) == {"done", "broke"} # Both targeted rows are now pending. assert _row(hub["rows"], "done")["status"] == "pending" assert _row(hub["rows"], "broke")["status"] == "pending" # The pending in-flight row is left strictly alone. assert _row(hub["rows"], "inflight")["status"] == "pending" # The legacy row keeps its old completed score. assert _row(hub["rows"], "legacy")["status"] == "completed" def test_rescore_all_empty_board_raises(hub, dispatch): """Rescore-all with nothing rescoreable is a no-op error, no write.""" hub["rows"] = [ {"submission_id": "inflight", "status": "pending", "submission_blob_url": "https://blob/x.zip"}, {"submission_id": "legacy", "status": "completed", "submission_blob_url": None}, ] with pytest.raises(ValueError): admin.rescore_all() assert dispatch["calls"] == 0 assert hub["uploads"] == 0