| """ |
| Tests for Stage 55 — admin recalibrate + run trigger HTTP. |
| |
| POST /admin/tenants/{tid}/recalibrate |
| POST /admin/tenants/{tid}/runs/trigger |
| |
| Existing tenant routes (/tenants/{tid}/calibrations, /runs) require |
| a tenant API key with operator role. Admin keys cannot call them |
| because auth_dep is rigid about tenant ApiKey. Stage 55 fills the |
| gap so admin dashboards can drive the fix-it loop without needing |
| to hold a tenant credential. |
| |
| vertical is inferred from the tenant's registered vertical, so the |
| admin only passes entity_type in the body. |
| """ |
| from datetime import datetime, timedelta, timezone |
|
|
| import pytest |
|
|
| pytest.importorskip("fastapi") |
| pytest.importorskip("httpx") |
| pytest.importorskip("yaml") |
| from fastapi.testclient import TestClient |
|
|
| from infra import OrgStateService |
| from infra.api import create_app |
|
|
|
|
| def _seed_tenant_with_observations(dbfile, tenant_id="acme", |
| n_days=20): |
| """Register a logistics tenant and push 20 days of observations, |
| then calibrate via the service (so trigger has a calibration |
| to work with — recalibrate doesn't need one).""" |
| from verticals import get_vertical_config |
| svc = OrgStateService(dbfile) |
| try: |
| svc.register_tenant(tenant_id, tenant_id, vertical="logistics") |
| cfg = get_vertical_config("logistics").entity_type("warehouse") |
| base = datetime(2026, 1, 1, tzinfo=timezone.utc) |
| rows = [] |
| for d in range(n_days): |
| for eid in ("wh_a", "wh_b"): |
| values = {m.name: 100.0 + d for m in cfg.metrics} |
| rows.append({ |
| "entity_id": eid, |
| "day": (base + timedelta(days=d)).date().isoformat(), |
| "values": values, |
| }) |
| svc.observations.upsert_batch(tenant_id, "warehouse", rows) |
| finally: |
| svc.close() |
|
|
|
|
| def _bootstrap(tmp_path): |
| dbfile = str(tmp_path / "admin_actions.sqlite3") |
| _seed_tenant_with_observations(dbfile, "acme") |
| svc = OrgStateService(dbfile) |
| try: |
| keys = { |
| "admin": svc.create_admin_key().raw, |
| "acme_admin": svc.create_api_key("acme", role="admin").raw, |
| } |
| finally: |
| svc.close() |
| return dbfile, keys |
|
|
|
|
| def _client(dbfile): |
| return TestClient(create_app(dbfile)) |
|
|
|
|
| def _auth(k): |
| return {"Authorization": f"Bearer {k}"} |
|
|
|
|
| |
|
|
| def test_recalibrate_happy_path_returns_summary(tmp_path): |
| """Admin POST → svc.recalibrate_from_stored runs → returns the |
| same summary shape as Stage 30 (previous_age_days etc.).""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post( |
| "/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["tenant_id"] == "acme" |
| assert body["entity_type"] == "warehouse" |
| assert body["vertical"] == "logistics" |
| assert body["n_observations_used"] == 40 |
| assert body["new_derived_at"] |
|
|
|
|
| def test_recalibrate_persists_calibration(tmp_path): |
| """End-to-end: HTTP call writes a calibration row that the |
| service can load afterwards.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| assert client.post( |
| "/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ).status_code == 200 |
| svc = OrgStateService(dbfile) |
| try: |
| assert svc.get_calibration("acme", "warehouse") is not None |
| finally: |
| svc.close() |
|
|
|
|
| def test_recalibrate_too_few_observations_returns_400(tmp_path): |
| """Service-layer guardrail surfaces as 400, not 500 — caller |
| fixed-able by lowering --min-observations or pushing more data.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| |
| r = client.post( |
| "/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse", "min_observations": 9999}, |
| ) |
| assert r.status_code == 400 |
|
|
|
|
| def test_recalibrate_missing_entity_type_400(tmp_path): |
| """Stage 78: Pydantic 422 with field-level error.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post("/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={}) |
| assert r.status_code == 422 |
|
|
|
|
| def test_recalibrate_unknown_tenant_404(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post( |
| "/admin/tenants/ghost/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| assert r.status_code == 404 |
|
|
|
|
| def test_recalibrate_unknown_entity_type_404(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post( |
| "/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "ghost"}, |
| ) |
| assert r.status_code == 404 |
|
|
|
|
| def test_recalibrate_writes_audit_entry_with_admin_actor(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| _client(dbfile).post( |
| "/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| svc = OrgStateService(dbfile) |
| try: |
| rows = svc.audit.list(action="recalibrate") |
| assert len(rows) == 1 |
| assert rows[0]["tenant_id"] == "acme" |
| assert rows[0]["actor"] |
| finally: |
| svc.close() |
|
|
|
|
| def test_recalibrate_no_admin_key_401(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post("/admin/tenants/acme/recalibrate", |
| json={"entity_type": "warehouse"}) |
| assert r.status_code == 401 |
|
|
|
|
| def test_recalibrate_tenant_key_forbidden(tmp_path): |
| """A tenant key — even role=admin on its own tenant — must not |
| be able to call /admin/ routes. Otherwise the admin/tenant |
| privilege boundary is gone.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post( |
| "/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["acme_admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| assert r.status_code in (401, 403) |
|
|
|
|
| |
|
|
| def test_trigger_happy_path_returns_run_handle(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| |
| client = _client(dbfile) |
| client.post("/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}) |
|
|
| r = client.post( |
| "/admin/tenants/acme/runs/trigger", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["run_id"].startswith("run_") |
| assert body["tenant_id"] == "acme" |
| assert body["entity_type"] == "warehouse" |
| assert "n_issues" in body |
|
|
|
|
| def test_trigger_persists_run_row(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| client.post("/admin/tenants/acme/recalibrate", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}) |
| rid = client.post( |
| "/admin/tenants/acme/runs/trigger", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ).json()["run_id"] |
| svc = OrgStateService(dbfile) |
| try: |
| assert svc.get_run(rid) is not None |
| finally: |
| svc.close() |
|
|
|
|
| def test_trigger_no_calibration_returns_400(tmp_path): |
| """run_from_stored raises CalibrationNotFound. Handler maps it |
| to 400 (caller-fixable by recalibrate first) — NOT 500. The |
| admin's response is "POST /admin/tenants/X/recalibrate, retry", |
| not "page someone for a server error".""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| |
| r = client.post( |
| "/admin/tenants/acme/runs/trigger", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| assert r.status_code == 400 |
| assert "calibration" in r.json()["error"]["message"].lower() |
|
|
|
|
| def test_trigger_missing_entity_type_400(tmp_path): |
| """Stage 78: Pydantic 422 with field-level error.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post("/admin/tenants/acme/runs/trigger", |
| headers=_auth(keys["admin"]), |
| json={}) |
| assert r.status_code == 422 |
|
|
|
|
| def test_trigger_unknown_tenant_404(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post( |
| "/admin/tenants/ghost/runs/trigger", |
| headers=_auth(keys["admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| assert r.status_code == 404 |
|
|
|
|
| def test_trigger_no_admin_key_401(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post("/admin/tenants/acme/runs/trigger", |
| json={"entity_type": "warehouse"}) |
| assert r.status_code == 401 |
|
|
|
|
| def test_trigger_tenant_key_forbidden(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.post( |
| "/admin/tenants/acme/runs/trigger", |
| headers=_auth(keys["acme_admin"]), |
| json={"entity_type": "warehouse"}, |
| ) |
| assert r.status_code in (401, 403) |
|
|