orgstate / tests /test_infra_api_audit.py
Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
Tests for Stage 53 β€” GET /admin/audit + GET /admin/audit.csv.
HTTP mirror of the Stage 12 CLI `audit list` with the Stage 51
time filters. JSON for ops dashboards, CSV for compliance pulls.
Both admin-only β€” audit log is platform-wide enumeration.
"""
import csv
import io
import json
import pytest
from infra import OrgStateService
from infra.api import handlers
def _insert(svc, log_id, created_at, *, actor="op", action="x",
tenant_id=None, payload=None):
svc.db.execute(
"""INSERT INTO audit_logs
(log_id, actor, action, target_id, tenant_id,
payload_json, created_at)
VALUES (?, ?, ?, NULL, ?, ?, ?)""",
(log_id, actor, action, tenant_id,
json.dumps(payload or {}), created_at),
)
# --- pure handlers ----------------------------------------------------
@pytest.fixture
def svc(tmp_path):
s = OrgStateService(str(tmp_path / "audit_http.sqlite3"))
_insert(s, 1, "2026-01-15T00:00:00+00:00",
actor="alice", action="register_tenant",
tenant_id="acme")
_insert(s, 2, "2026-02-20T00:00:00+00:00",
actor="bob", action="recalibrate",
tenant_id="acme", payload={"reason": "rotation"})
_insert(s, 3, "2026-05-10T00:00:00+00:00",
actor="alice", action="recalibrate",
tenant_id="globex")
yield s
s.close()
def test_list_audit_no_filters_returns_all_newest_first(svc):
out = handlers.list_audit(svc)
assert out["count"] == 3
# newest first by log_id DESC
assert [r["log_id"] for r in out["audit"]] == [3, 2, 1]
def test_list_audit_filter_by_actor(svc):
out = handlers.list_audit(svc, actor="alice")
assert out["count"] == 2
assert all(r["actor"] == "alice" for r in out["audit"])
# echo back so caller can verify what they asked
assert out["actor"] == "alice"
def test_list_audit_filter_by_time_range(svc):
"""Q1 pattern via the HTTP handler β€” same since-inclusive,
until-exclusive semantics as the CLI."""
out = handlers.list_audit(
svc,
since="2026-01-01T00:00:00+00:00",
until="2026-04-01T00:00:00+00:00",
)
actions = {r["action"] for r in out["audit"]}
assert actions == {"register_tenant", "recalibrate"}
assert out["since"] == "2026-01-01T00:00:00+00:00"
assert out["until"] == "2026-04-01T00:00:00+00:00"
def test_list_audit_no_filter_echo_keys_absent(svc):
out = handlers.list_audit(svc)
for k in ("actor", "action", "tenant_id", "since", "until"):
assert k not in out
def test_audit_csv_header_row(svc):
rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc))))
assert rows[0] == [
"log_id", "created_at", "actor", "action",
"tenant_id", "target_id", "payload_json",
]
def test_audit_csv_body_carries_compact_payload_json(svc):
"""payload column is a JSON string β€” survives round-trip through
Excel as a single cell rather than expanding into columns."""
rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc))))
by_id = {r[0]: r for r in rows[1:]}
# the recalibrate row had payload {"reason": "rotation"}
cell = by_id["2"][6]
assert json.loads(cell) == {"reason": "rotation"}
def test_audit_csv_null_tenant_renders_as_empty(svc):
"""tenant_id NULL β†’ blank cell, not 'None'."""
_insert(svc, 99, "2026-05-15T00:00:00+00:00",
actor="admin", action="mint_admin_key",
tenant_id=None)
rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc))))
by_id = {r[0]: r for r in rows[1:]}
assert by_id["99"][4] == "" # tenant_id cell empty
# --- HTTP route smoke -------------------------------------------------
def _client(dbfile):
pytest.importorskip("fastapi")
pytest.importorskip("httpx")
from fastapi.testclient import TestClient
from infra.api import create_app
return TestClient(create_app(dbfile))
def _bootstrap(tmp_path):
dbfile = str(tmp_path / "http.sqlite3")
svc = OrgStateService(dbfile)
try:
_insert(svc, 1, "2026-01-15T00:00:00+00:00",
actor="alice", action="register_tenant")
_insert(svc, 2, "2026-02-20T00:00:00+00:00",
actor="bob", action="recalibrate")
admin = svc.create_admin_key()
# also a tenant key β€” for the negative auth test
svc.register_tenant("acme", "ACME")
tk = svc.create_api_key("acme", role="admin")
finally:
svc.close()
return dbfile, {"admin": admin.raw, "tenant": tk.raw}
def test_json_route_with_admin_returns_200(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit",
headers={"Authorization": f"Bearer {keys['admin']}"})
assert r.status_code == 200
body = r.json()
assert "audit" in body
assert body["count"] >= 2
def test_json_route_401_when_admin_configured_and_no_key(tmp_path):
dbfile, _ = _bootstrap(tmp_path)
# an admin key exists in the DB β†’ route locks; no header β†’ 401
client = _client(dbfile)
r = client.get("/admin/audit")
assert r.status_code == 401
def test_json_route_tenant_key_forbidden(tmp_path):
"""Even a tenant key with role=admin must NOT be able to read
the audit log β€” it enumerates across tenants."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit",
headers={"Authorization": f"Bearer {keys['tenant']}"})
assert r.status_code in (401, 403)
def test_json_route_filters_propagate_to_body(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit?actor=alice",
headers={"Authorization": f"Bearer {keys['admin']}"})
body = r.json()
assert body["count"] == 1
assert body["audit"][0]["actor"] == "alice"
assert body["actor"] == "alice"
def test_csv_route_returns_text_csv(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit.csv",
headers={"Authorization": f"Bearer {keys['admin']}"})
assert r.status_code == 200
assert r.headers["content-type"].startswith("text/csv")
def test_csv_route_attachment_disposition_carries_scope(tmp_path):
"""No filters β†’ filename has 'all'; actor filter β†’ filename has
'actor-alice' so two parallel exports don't collide."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit.csv?actor=alice",
headers={"Authorization": f"Bearer {keys['admin']}"})
cd = r.headers["content-disposition"]
assert "attachment" in cd
assert "actor-alice" in cd
def test_csv_route_no_filter_scope_says_all(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit.csv",
headers={"Authorization": f"Bearer {keys['admin']}"})
assert "all" in r.headers["content-disposition"]
def test_csv_route_401_when_admin_configured_and_no_key(tmp_path):
dbfile, _ = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit.csv")
assert r.status_code == 401
def test_csv_route_body_parses_and_carries_filtered_rows(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/admin/audit.csv?actor=alice",
headers={"Authorization": f"Bearer {keys['admin']}"})
rows = list(csv.reader(io.StringIO(r.text)))
body = rows[1:]
assert len(body) == 1
assert body[0][2] == "alice" # actor column