orgstate / tests /test_infra_api_admin_actions.py
Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
Tests for Stage 55 — admin recalibrate + run trigger HTTP.
POST /admin/tenants/{tid}/recalibrate
POST /admin/tenants/{tid}/runs/trigger
Existing tenant routes (/tenants/{tid}/calibrations, /runs) require
a tenant API key with operator role. Admin keys cannot call them
because auth_dep is rigid about tenant ApiKey. Stage 55 fills the
gap so admin dashboards can drive the fix-it loop without needing
to hold a tenant credential.
vertical is inferred from the tenant's registered vertical, so the
admin only passes entity_type in the body.
"""
from datetime import datetime, timedelta, timezone
import pytest
pytest.importorskip("fastapi")
pytest.importorskip("httpx")
pytest.importorskip("yaml")
from fastapi.testclient import TestClient
from infra import OrgStateService
from infra.api import create_app
def _seed_tenant_with_observations(dbfile, tenant_id="acme",
n_days=20):
"""Register a logistics tenant and push 20 days of observations,
then calibrate via the service (so trigger has a calibration
to work with — recalibrate doesn't need one)."""
from verticals import get_vertical_config
svc = OrgStateService(dbfile)
try:
svc.register_tenant(tenant_id, tenant_id, vertical="logistics")
cfg = get_vertical_config("logistics").entity_type("warehouse")
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
rows = []
for d in range(n_days):
for eid in ("wh_a", "wh_b"):
values = {m.name: 100.0 + d for m in cfg.metrics}
rows.append({
"entity_id": eid,
"day": (base + timedelta(days=d)).date().isoformat(),
"values": values,
})
svc.observations.upsert_batch(tenant_id, "warehouse", rows)
finally:
svc.close()
def _bootstrap(tmp_path):
dbfile = str(tmp_path / "admin_actions.sqlite3")
_seed_tenant_with_observations(dbfile, "acme")
svc = OrgStateService(dbfile)
try:
keys = {
"admin": svc.create_admin_key().raw,
"acme_admin": svc.create_api_key("acme", role="admin").raw,
}
finally:
svc.close()
return dbfile, keys
def _client(dbfile):
return TestClient(create_app(dbfile))
def _auth(k):
return {"Authorization": f"Bearer {k}"}
# --- POST /admin/tenants/{tid}/recalibrate ---------------------------
def test_recalibrate_happy_path_returns_summary(tmp_path):
"""Admin POST → svc.recalibrate_from_stored runs → returns the
same summary shape as Stage 30 (previous_age_days etc.)."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post(
"/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
)
assert r.status_code == 200
body = r.json()
assert body["tenant_id"] == "acme"
assert body["entity_type"] == "warehouse"
assert body["vertical"] == "logistics"
assert body["n_observations_used"] == 40 # 20 days × 2 entities
assert body["new_derived_at"]
def test_recalibrate_persists_calibration(tmp_path):
"""End-to-end: HTTP call writes a calibration row that the
service can load afterwards."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
assert client.post(
"/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
).status_code == 200
svc = OrgStateService(dbfile)
try:
assert svc.get_calibration("acme", "warehouse") is not None
finally:
svc.close()
def test_recalibrate_too_few_observations_returns_400(tmp_path):
"""Service-layer guardrail surfaces as 400, not 500 — caller
fixed-able by lowering --min-observations or pushing more data."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
# 14 is default; ask for 9999 to force the guardrail
r = client.post(
"/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse", "min_observations": 9999},
)
assert r.status_code == 400
def test_recalibrate_missing_entity_type_400(tmp_path):
"""Stage 78: Pydantic 422 with field-level error."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={})
assert r.status_code == 422
def test_recalibrate_unknown_tenant_404(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post(
"/admin/tenants/ghost/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
)
assert r.status_code == 404
def test_recalibrate_unknown_entity_type_404(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post(
"/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "ghost"},
)
assert r.status_code == 404
def test_recalibrate_writes_audit_entry_with_admin_actor(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
_client(dbfile).post(
"/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
)
svc = OrgStateService(dbfile)
try:
rows = svc.audit.list(action="recalibrate")
assert len(rows) == 1
assert rows[0]["tenant_id"] == "acme"
assert rows[0]["actor"] # admin actor string is set
finally:
svc.close()
def test_recalibrate_no_admin_key_401(tmp_path):
dbfile, _ = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/recalibrate",
json={"entity_type": "warehouse"})
assert r.status_code == 401
def test_recalibrate_tenant_key_forbidden(tmp_path):
"""A tenant key — even role=admin on its own tenant — must not
be able to call /admin/ routes. Otherwise the admin/tenant
privilege boundary is gone."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post(
"/admin/tenants/acme/recalibrate",
headers=_auth(keys["acme_admin"]),
json={"entity_type": "warehouse"},
)
assert r.status_code in (401, 403)
# --- POST /admin/tenants/{tid}/runs/trigger --------------------------
def test_trigger_happy_path_returns_run_handle(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
# recalibrate first so a calibration exists
client = _client(dbfile)
client.post("/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"})
r = client.post(
"/admin/tenants/acme/runs/trigger",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
)
assert r.status_code == 200
body = r.json()
assert body["run_id"].startswith("run_")
assert body["tenant_id"] == "acme"
assert body["entity_type"] == "warehouse"
assert "n_issues" in body
def test_trigger_persists_run_row(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
client.post("/admin/tenants/acme/recalibrate",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"})
rid = client.post(
"/admin/tenants/acme/runs/trigger",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
).json()["run_id"]
svc = OrgStateService(dbfile)
try:
assert svc.get_run(rid) is not None
finally:
svc.close()
def test_trigger_no_calibration_returns_400(tmp_path):
"""run_from_stored raises CalibrationNotFound. Handler maps it
to 400 (caller-fixable by recalibrate first) — NOT 500. The
admin's response is "POST /admin/tenants/X/recalibrate, retry",
not "page someone for a server error"."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
# no recalibrate first → no calibration
r = client.post(
"/admin/tenants/acme/runs/trigger",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
)
assert r.status_code == 400
assert "calibration" in r.json()["error"]["message"].lower()
def test_trigger_missing_entity_type_400(tmp_path):
"""Stage 78: Pydantic 422 with field-level error."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/runs/trigger",
headers=_auth(keys["admin"]),
json={})
assert r.status_code == 422
def test_trigger_unknown_tenant_404(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post(
"/admin/tenants/ghost/runs/trigger",
headers=_auth(keys["admin"]),
json={"entity_type": "warehouse"},
)
assert r.status_code == 404
def test_trigger_no_admin_key_401(tmp_path):
dbfile, _ = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/runs/trigger",
json={"entity_type": "warehouse"})
assert r.status_code == 401
def test_trigger_tenant_key_forbidden(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post(
"/admin/tenants/acme/runs/trigger",
headers=_auth(keys["acme_admin"]),
json={"entity_type": "warehouse"},
)
assert r.status_code in (401, 403)