orgstate / tests /test_infra_api_admin_tenant.py
Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
Tests for Stage 54 β€” admin tenant HTTP endpoints.
GET /admin/tenants cross-tenant list
POST /admin/tenants/{tid}/pause lifecycle freeze (Stage 52 over HTTP)
POST /admin/tenants/{tid}/resume un-freeze
Admin dashboards need point-and-click on these β€” CLI is fine for
operators in shell, less fine for someone running a web ops console.
Tenant API keys, even role=admin on their own tenant, must NOT be
able to call these (cross-tenant enumeration / lifecycle control).
"""
import pytest
from infra import OrgStateService
pytest.importorskip("fastapi")
pytest.importorskip("httpx")
from fastapi.testclient import TestClient
from infra.api import create_app
def _bootstrap(tmp_path):
dbfile = str(tmp_path / "admin_tenant.sqlite3")
svc = OrgStateService(dbfile)
try:
svc.register_tenant("acme", "ACME")
svc.register_tenant("globex", "Globex")
keys = {
"admin": svc.create_admin_key().raw,
"acme_admin": svc.create_api_key("acme", role="admin").raw,
"acme_ro": svc.create_api_key("acme", role="readonly").raw,
}
finally:
svc.close()
return dbfile, keys
def _client(dbfile):
return TestClient(create_app(dbfile))
def _auth(k):
return {"Authorization": f"Bearer {k}"}
# --- GET /admin/tenants ----------------------------------------------
def test_admin_list_returns_every_tenant(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/tenants", headers=_auth(keys["admin"]))
assert r.status_code == 200
body = r.json()
tids = {t["tenant_id"] for t in body["tenants"]}
assert tids == {"acme", "globex"}
def test_admin_list_carries_status_field(tmp_path):
"""The list must surface status='active' so the dashboard can
render pause/resume buttons in the right state."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/tenants", headers=_auth(keys["admin"]))
for t in r.json()["tenants"]:
assert t["status"] in ("active", "paused")
def test_admin_list_401_without_key(tmp_path):
"""Once admin auth is configured (admin key exists in DB), the
route locks. Same pattern as the other /admin/* routes."""
dbfile, _ = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/tenants")
assert r.status_code == 401
def test_admin_list_tenant_key_forbidden(tmp_path):
"""A tenant key even with role=admin must NOT be able to
enumerate other tenants. This is the cross-tenant defense."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/tenants", headers=_auth(keys["acme_admin"]))
assert r.status_code in (401, 403)
# --- POST /admin/tenants/{tid}/pause + /resume -----------------------
def test_pause_then_resume_roundtrip(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/pause",
headers=_auth(keys["admin"]))
assert r.status_code == 200
assert r.json()["status"] == "paused"
r = client.post("/admin/tenants/acme/resume",
headers=_auth(keys["admin"]))
assert r.status_code == 200
assert r.json()["status"] == "active"
def test_pause_blocks_writes_through_service(tmp_path):
"""End-to-end: HTTP pause β†’ service.run_analysis raises
TenantPausedError. Verifies the pause actually wires through
the same enforcement Stage 52 added."""
from core import HIGHER_IS_WORSE, EntityTypeConfig, MetricConfig, Observation
from infra.service import TenantPausedError
dbfile, keys = _bootstrap(tmp_path)
# calibrate first so run_analysis would otherwise succeed
svc = OrgStateService(dbfile)
try:
cfg = EntityTypeConfig(
entity_type="widget",
metrics=[MetricConfig("m", HIGHER_IS_WORSE,
weight=1.0, feeds_anomaly=True)],
baseline_window=7, baseline_lag=2, recent_window=3,
)
obs = [
Observation("e1", f"2026-01-{i + 1:02d}", {"m": 100.0 + i})
for i in range(20)
]
svc.calibrate_and_store("acme", obs, cfg)
finally:
svc.close()
# pause via HTTP
_client(dbfile).post("/admin/tenants/acme/pause",
headers=_auth(keys["admin"]))
# now service.run_analysis raises
svc = OrgStateService(dbfile)
try:
with pytest.raises(TenantPausedError):
svc.run_analysis("acme", obs, cfg)
finally:
svc.close()
def test_pause_writes_audit_entry(tmp_path):
"""Lifecycle changes are operationally significant β€” must
show up in the audit trail with the admin actor identity."""
dbfile, keys = _bootstrap(tmp_path)
_client(dbfile).post("/admin/tenants/acme/pause",
headers=_auth(keys["admin"]))
svc = OrgStateService(dbfile)
try:
rows = svc.audit.list(action="set_tenant_status")
assert len(rows) == 1
assert rows[0]["tenant_id"] == "acme"
assert rows[0]["payload"]["status"] == "paused"
# actor is the admin identity (the helper resolves
# raw token β†’ "admin:<key_id>" or similar)
assert rows[0]["actor"]
finally:
svc.close()
def test_pause_unknown_tenant_404(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/ghost/pause",
headers=_auth(keys["admin"]))
assert r.status_code == 404
def test_pause_401_without_key(tmp_path):
dbfile, _ = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/pause")
assert r.status_code == 401
def test_pause_tenant_key_forbidden(tmp_path):
"""A tenant key β€” even role=admin on its OWN tenant β€” must NOT
be able to flip its own status. Otherwise a tenant could un-pause
themselves out of a freeze."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/pause",
headers=_auth(keys["acme_admin"]))
assert r.status_code in (401, 403)
def test_resume_tenant_key_forbidden(tmp_path):
"""Same defense on resume β€” the freeze sticks until admin lifts."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.post("/admin/tenants/acme/resume",
headers=_auth(keys["acme_admin"]))
assert r.status_code in (401, 403)
def test_pause_then_list_shows_paused_status(tmp_path):
"""After pause, GET /admin/tenants shows the new status β€” proves
the list endpoint is reading fresh state, not cached."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
client.post("/admin/tenants/acme/pause",
headers=_auth(keys["admin"]))
body = client.get("/admin/tenants",
headers=_auth(keys["admin"])).json()
by_id = {t["tenant_id"]: t for t in body["tenants"]}
assert by_id["acme"]["status"] == "paused"
assert by_id["globex"]["status"] == "active"