""" Tests for Stage 55 — admin recalibrate + run trigger HTTP. POST /admin/tenants/{tid}/recalibrate POST /admin/tenants/{tid}/runs/trigger Existing tenant routes (/tenants/{tid}/calibrations, /runs) require a tenant API key with operator role. Admin keys cannot call them because auth_dep is rigid about tenant ApiKey. Stage 55 fills the gap so admin dashboards can drive the fix-it loop without needing to hold a tenant credential. vertical is inferred from the tenant's registered vertical, so the admin only passes entity_type in the body. """ from datetime import datetime, timedelta, timezone import pytest pytest.importorskip("fastapi") pytest.importorskip("httpx") pytest.importorskip("yaml") from fastapi.testclient import TestClient from infra import OrgStateService from infra.api import create_app def _seed_tenant_with_observations(dbfile, tenant_id="acme", n_days=20): """Register a logistics tenant and push 20 days of observations, then calibrate via the service (so trigger has a calibration to work with — recalibrate doesn't need one).""" from verticals import get_vertical_config svc = OrgStateService(dbfile) try: svc.register_tenant(tenant_id, tenant_id, vertical="logistics") cfg = get_vertical_config("logistics").entity_type("warehouse") base = datetime(2026, 1, 1, tzinfo=timezone.utc) rows = [] for d in range(n_days): for eid in ("wh_a", "wh_b"): values = {m.name: 100.0 + d for m in cfg.metrics} rows.append({ "entity_id": eid, "day": (base + timedelta(days=d)).date().isoformat(), "values": values, }) svc.observations.upsert_batch(tenant_id, "warehouse", rows) finally: svc.close() def _bootstrap(tmp_path): dbfile = str(tmp_path / "admin_actions.sqlite3") _seed_tenant_with_observations(dbfile, "acme") svc = OrgStateService(dbfile) try: keys = { "admin": svc.create_admin_key().raw, "acme_admin": svc.create_api_key("acme", role="admin").raw, } finally: svc.close() return dbfile, keys def _client(dbfile): return TestClient(create_app(dbfile)) def _auth(k): return {"Authorization": f"Bearer {k}"} # --- POST /admin/tenants/{tid}/recalibrate --------------------------- def test_recalibrate_happy_path_returns_summary(tmp_path): """Admin POST → svc.recalibrate_from_stored runs → returns the same summary shape as Stage 30 (previous_age_days etc.).""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post( "/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ) assert r.status_code == 200 body = r.json() assert body["tenant_id"] == "acme" assert body["entity_type"] == "warehouse" assert body["vertical"] == "logistics" assert body["n_observations_used"] == 40 # 20 days × 2 entities assert body["new_derived_at"] def test_recalibrate_persists_calibration(tmp_path): """End-to-end: HTTP call writes a calibration row that the service can load afterwards.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) assert client.post( "/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ).status_code == 200 svc = OrgStateService(dbfile) try: assert svc.get_calibration("acme", "warehouse") is not None finally: svc.close() def test_recalibrate_too_few_observations_returns_400(tmp_path): """Service-layer guardrail surfaces as 400, not 500 — caller fixed-able by lowering --min-observations or pushing more data.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) # 14 is default; ask for 9999 to force the guardrail r = client.post( "/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "warehouse", "min_observations": 9999}, ) assert r.status_code == 400 def test_recalibrate_missing_entity_type_400(tmp_path): """Stage 78: Pydantic 422 with field-level error.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post("/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={}) assert r.status_code == 422 def test_recalibrate_unknown_tenant_404(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post( "/admin/tenants/ghost/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ) assert r.status_code == 404 def test_recalibrate_unknown_entity_type_404(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post( "/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "ghost"}, ) assert r.status_code == 404 def test_recalibrate_writes_audit_entry_with_admin_actor(tmp_path): dbfile, keys = _bootstrap(tmp_path) _client(dbfile).post( "/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ) svc = OrgStateService(dbfile) try: rows = svc.audit.list(action="recalibrate") assert len(rows) == 1 assert rows[0]["tenant_id"] == "acme" assert rows[0]["actor"] # admin actor string is set finally: svc.close() def test_recalibrate_no_admin_key_401(tmp_path): dbfile, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.post("/admin/tenants/acme/recalibrate", json={"entity_type": "warehouse"}) assert r.status_code == 401 def test_recalibrate_tenant_key_forbidden(tmp_path): """A tenant key — even role=admin on its own tenant — must not be able to call /admin/ routes. Otherwise the admin/tenant privilege boundary is gone.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post( "/admin/tenants/acme/recalibrate", headers=_auth(keys["acme_admin"]), json={"entity_type": "warehouse"}, ) assert r.status_code in (401, 403) # --- POST /admin/tenants/{tid}/runs/trigger -------------------------- def test_trigger_happy_path_returns_run_handle(tmp_path): dbfile, keys = _bootstrap(tmp_path) # recalibrate first so a calibration exists client = _client(dbfile) client.post("/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}) r = client.post( "/admin/tenants/acme/runs/trigger", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ) assert r.status_code == 200 body = r.json() assert body["run_id"].startswith("run_") assert body["tenant_id"] == "acme" assert body["entity_type"] == "warehouse" assert "n_issues" in body def test_trigger_persists_run_row(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) client.post("/admin/tenants/acme/recalibrate", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}) rid = client.post( "/admin/tenants/acme/runs/trigger", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ).json()["run_id"] svc = OrgStateService(dbfile) try: assert svc.get_run(rid) is not None finally: svc.close() def test_trigger_no_calibration_returns_400(tmp_path): """run_from_stored raises CalibrationNotFound. Handler maps it to 400 (caller-fixable by recalibrate first) — NOT 500. The admin's response is "POST /admin/tenants/X/recalibrate, retry", not "page someone for a server error".""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) # no recalibrate first → no calibration r = client.post( "/admin/tenants/acme/runs/trigger", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ) assert r.status_code == 400 assert "calibration" in r.json()["error"]["message"].lower() def test_trigger_missing_entity_type_400(tmp_path): """Stage 78: Pydantic 422 with field-level error.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post("/admin/tenants/acme/runs/trigger", headers=_auth(keys["admin"]), json={}) assert r.status_code == 422 def test_trigger_unknown_tenant_404(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post( "/admin/tenants/ghost/runs/trigger", headers=_auth(keys["admin"]), json={"entity_type": "warehouse"}, ) assert r.status_code == 404 def test_trigger_no_admin_key_401(tmp_path): dbfile, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.post("/admin/tenants/acme/runs/trigger", json={"entity_type": "warehouse"}) assert r.status_code == 401 def test_trigger_tenant_key_forbidden(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.post( "/admin/tenants/acme/runs/trigger", headers=_auth(keys["acme_admin"]), json={"entity_type": "warehouse"}, ) assert r.status_code in (401, 403)