| """ |
| Tests for Stage 53 β GET /admin/audit + GET /admin/audit.csv. |
| |
| HTTP mirror of the Stage 12 CLI `audit list` with the Stage 51 |
| time filters. JSON for ops dashboards, CSV for compliance pulls. |
| Both admin-only β audit log is platform-wide enumeration. |
| """ |
| import csv |
| import io |
| import json |
|
|
| import pytest |
|
|
| from infra import OrgStateService |
| from infra.api import handlers |
|
|
|
|
| def _insert(svc, log_id, created_at, *, actor="op", action="x", |
| tenant_id=None, payload=None): |
| svc.db.execute( |
| """INSERT INTO audit_logs |
| (log_id, actor, action, target_id, tenant_id, |
| payload_json, created_at) |
| VALUES (?, ?, ?, NULL, ?, ?, ?)""", |
| (log_id, actor, action, tenant_id, |
| json.dumps(payload or {}), created_at), |
| ) |
|
|
|
|
| |
|
|
| @pytest.fixture |
| def svc(tmp_path): |
| s = OrgStateService(str(tmp_path / "audit_http.sqlite3")) |
| _insert(s, 1, "2026-01-15T00:00:00+00:00", |
| actor="alice", action="register_tenant", |
| tenant_id="acme") |
| _insert(s, 2, "2026-02-20T00:00:00+00:00", |
| actor="bob", action="recalibrate", |
| tenant_id="acme", payload={"reason": "rotation"}) |
| _insert(s, 3, "2026-05-10T00:00:00+00:00", |
| actor="alice", action="recalibrate", |
| tenant_id="globex") |
| yield s |
| s.close() |
|
|
|
|
| def test_list_audit_no_filters_returns_all_newest_first(svc): |
| out = handlers.list_audit(svc) |
| assert out["count"] == 3 |
| |
| assert [r["log_id"] for r in out["audit"]] == [3, 2, 1] |
|
|
|
|
| def test_list_audit_filter_by_actor(svc): |
| out = handlers.list_audit(svc, actor="alice") |
| assert out["count"] == 2 |
| assert all(r["actor"] == "alice" for r in out["audit"]) |
| |
| assert out["actor"] == "alice" |
|
|
|
|
| def test_list_audit_filter_by_time_range(svc): |
| """Q1 pattern via the HTTP handler β same since-inclusive, |
| until-exclusive semantics as the CLI.""" |
| out = handlers.list_audit( |
| svc, |
| since="2026-01-01T00:00:00+00:00", |
| until="2026-04-01T00:00:00+00:00", |
| ) |
| actions = {r["action"] for r in out["audit"]} |
| assert actions == {"register_tenant", "recalibrate"} |
| assert out["since"] == "2026-01-01T00:00:00+00:00" |
| assert out["until"] == "2026-04-01T00:00:00+00:00" |
|
|
|
|
| def test_list_audit_no_filter_echo_keys_absent(svc): |
| out = handlers.list_audit(svc) |
| for k in ("actor", "action", "tenant_id", "since", "until"): |
| assert k not in out |
|
|
|
|
| def test_audit_csv_header_row(svc): |
| rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc)))) |
| assert rows[0] == [ |
| "log_id", "created_at", "actor", "action", |
| "tenant_id", "target_id", "payload_json", |
| ] |
|
|
|
|
| def test_audit_csv_body_carries_compact_payload_json(svc): |
| """payload column is a JSON string β survives round-trip through |
| Excel as a single cell rather than expanding into columns.""" |
| rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc)))) |
| by_id = {r[0]: r for r in rows[1:]} |
| |
| cell = by_id["2"][6] |
| assert json.loads(cell) == {"reason": "rotation"} |
|
|
|
|
| def test_audit_csv_null_tenant_renders_as_empty(svc): |
| """tenant_id NULL β blank cell, not 'None'.""" |
| _insert(svc, 99, "2026-05-15T00:00:00+00:00", |
| actor="admin", action="mint_admin_key", |
| tenant_id=None) |
| rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc)))) |
| by_id = {r[0]: r for r in rows[1:]} |
| assert by_id["99"][4] == "" |
|
|
|
|
| |
|
|
| def _client(dbfile): |
| pytest.importorskip("fastapi") |
| pytest.importorskip("httpx") |
| from fastapi.testclient import TestClient |
|
|
| from infra.api import create_app |
| return TestClient(create_app(dbfile)) |
|
|
|
|
| def _bootstrap(tmp_path): |
| dbfile = str(tmp_path / "http.sqlite3") |
| svc = OrgStateService(dbfile) |
| try: |
| _insert(svc, 1, "2026-01-15T00:00:00+00:00", |
| actor="alice", action="register_tenant") |
| _insert(svc, 2, "2026-02-20T00:00:00+00:00", |
| actor="bob", action="recalibrate") |
| admin = svc.create_admin_key() |
| |
| svc.register_tenant("acme", "ACME") |
| tk = svc.create_api_key("acme", role="admin") |
| finally: |
| svc.close() |
| return dbfile, {"admin": admin.raw, "tenant": tk.raw} |
|
|
|
|
| def test_json_route_with_admin_returns_200(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit", |
| headers={"Authorization": f"Bearer {keys['admin']}"}) |
| assert r.status_code == 200 |
| body = r.json() |
| assert "audit" in body |
| assert body["count"] >= 2 |
|
|
|
|
| def test_json_route_401_when_admin_configured_and_no_key(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| |
| client = _client(dbfile) |
| r = client.get("/admin/audit") |
| assert r.status_code == 401 |
|
|
|
|
| def test_json_route_tenant_key_forbidden(tmp_path): |
| """Even a tenant key with role=admin must NOT be able to read |
| the audit log β it enumerates across tenants.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit", |
| headers={"Authorization": f"Bearer {keys['tenant']}"}) |
| assert r.status_code in (401, 403) |
|
|
|
|
| def test_json_route_filters_propagate_to_body(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit?actor=alice", |
| headers={"Authorization": f"Bearer {keys['admin']}"}) |
| body = r.json() |
| assert body["count"] == 1 |
| assert body["audit"][0]["actor"] == "alice" |
| assert body["actor"] == "alice" |
|
|
|
|
| def test_csv_route_returns_text_csv(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit.csv", |
| headers={"Authorization": f"Bearer {keys['admin']}"}) |
| assert r.status_code == 200 |
| assert r.headers["content-type"].startswith("text/csv") |
|
|
|
|
| def test_csv_route_attachment_disposition_carries_scope(tmp_path): |
| """No filters β filename has 'all'; actor filter β filename has |
| 'actor-alice' so two parallel exports don't collide.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit.csv?actor=alice", |
| headers={"Authorization": f"Bearer {keys['admin']}"}) |
| cd = r.headers["content-disposition"] |
| assert "attachment" in cd |
| assert "actor-alice" in cd |
|
|
|
|
| def test_csv_route_no_filter_scope_says_all(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit.csv", |
| headers={"Authorization": f"Bearer {keys['admin']}"}) |
| assert "all" in r.headers["content-disposition"] |
|
|
|
|
| def test_csv_route_401_when_admin_configured_and_no_key(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit.csv") |
| assert r.status_code == 401 |
|
|
|
|
| def test_csv_route_body_parses_and_carries_filtered_rows(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/admin/audit.csv?actor=alice", |
| headers={"Authorization": f"Bearer {keys['admin']}"}) |
| rows = list(csv.reader(io.StringIO(r.text))) |
| body = rows[1:] |
| assert len(body) == 1 |
| assert body[0][2] == "alice" |
|
|