""" Tests for Stage 53 — GET /admin/audit + GET /admin/audit.csv. HTTP mirror of the Stage 12 CLI `audit list` with the Stage 51 time filters. JSON for ops dashboards, CSV for compliance pulls. Both admin-only — audit log is platform-wide enumeration. """ import csv import io import json import pytest from infra import OrgStateService from infra.api import handlers def _insert(svc, log_id, created_at, *, actor="op", action="x", tenant_id=None, payload=None): svc.db.execute( """INSERT INTO audit_logs (log_id, actor, action, target_id, tenant_id, payload_json, created_at) VALUES (?, ?, ?, NULL, ?, ?, ?)""", (log_id, actor, action, tenant_id, json.dumps(payload or {}), created_at), ) # --- pure handlers ---------------------------------------------------- @pytest.fixture def svc(tmp_path): s = OrgStateService(str(tmp_path / "audit_http.sqlite3")) _insert(s, 1, "2026-01-15T00:00:00+00:00", actor="alice", action="register_tenant", tenant_id="acme") _insert(s, 2, "2026-02-20T00:00:00+00:00", actor="bob", action="recalibrate", tenant_id="acme", payload={"reason": "rotation"}) _insert(s, 3, "2026-05-10T00:00:00+00:00", actor="alice", action="recalibrate", tenant_id="globex") yield s s.close() def test_list_audit_no_filters_returns_all_newest_first(svc): out = handlers.list_audit(svc) assert out["count"] == 3 # newest first by log_id DESC assert [r["log_id"] for r in out["audit"]] == [3, 2, 1] def test_list_audit_filter_by_actor(svc): out = handlers.list_audit(svc, actor="alice") assert out["count"] == 2 assert all(r["actor"] == "alice" for r in out["audit"]) # echo back so caller can verify what they asked assert out["actor"] == "alice" def test_list_audit_filter_by_time_range(svc): """Q1 pattern via the HTTP handler — same since-inclusive, until-exclusive semantics as the CLI.""" out = handlers.list_audit( svc, since="2026-01-01T00:00:00+00:00", until="2026-04-01T00:00:00+00:00", ) actions = {r["action"] for r in out["audit"]} assert actions == {"register_tenant", "recalibrate"} assert out["since"] == "2026-01-01T00:00:00+00:00" assert out["until"] == "2026-04-01T00:00:00+00:00" def test_list_audit_no_filter_echo_keys_absent(svc): out = handlers.list_audit(svc) for k in ("actor", "action", "tenant_id", "since", "until"): assert k not in out def test_audit_csv_header_row(svc): rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc)))) assert rows[0] == [ "log_id", "created_at", "actor", "action", "tenant_id", "target_id", "payload_json", ] def test_audit_csv_body_carries_compact_payload_json(svc): """payload column is a JSON string — survives round-trip through Excel as a single cell rather than expanding into columns.""" rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc)))) by_id = {r[0]: r for r in rows[1:]} # the recalibrate row had payload {"reason": "rotation"} cell = by_id["2"][6] assert json.loads(cell) == {"reason": "rotation"} def test_audit_csv_null_tenant_renders_as_empty(svc): """tenant_id NULL → blank cell, not 'None'.""" _insert(svc, 99, "2026-05-15T00:00:00+00:00", actor="admin", action="mint_admin_key", tenant_id=None) rows = list(csv.reader(io.StringIO(handlers.audit_csv(svc)))) by_id = {r[0]: r for r in rows[1:]} assert by_id["99"][4] == "" # tenant_id cell empty # --- HTTP route smoke ------------------------------------------------- def _client(dbfile): pytest.importorskip("fastapi") pytest.importorskip("httpx") from fastapi.testclient import TestClient from infra.api import create_app return TestClient(create_app(dbfile)) def _bootstrap(tmp_path): dbfile = str(tmp_path / "http.sqlite3") svc = OrgStateService(dbfile) try: _insert(svc, 1, "2026-01-15T00:00:00+00:00", actor="alice", action="register_tenant") _insert(svc, 2, "2026-02-20T00:00:00+00:00", actor="bob", action="recalibrate") admin = svc.create_admin_key() # also a tenant key — for the negative auth test svc.register_tenant("acme", "ACME") tk = svc.create_api_key("acme", role="admin") finally: svc.close() return dbfile, {"admin": admin.raw, "tenant": tk.raw} def test_json_route_with_admin_returns_200(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit", headers={"Authorization": f"Bearer {keys['admin']}"}) assert r.status_code == 200 body = r.json() assert "audit" in body assert body["count"] >= 2 def test_json_route_401_when_admin_configured_and_no_key(tmp_path): dbfile, _ = _bootstrap(tmp_path) # an admin key exists in the DB → route locks; no header → 401 client = _client(dbfile) r = client.get("/admin/audit") assert r.status_code == 401 def test_json_route_tenant_key_forbidden(tmp_path): """Even a tenant key with role=admin must NOT be able to read the audit log — it enumerates across tenants.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit", headers={"Authorization": f"Bearer {keys['tenant']}"}) assert r.status_code in (401, 403) def test_json_route_filters_propagate_to_body(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit?actor=alice", headers={"Authorization": f"Bearer {keys['admin']}"}) body = r.json() assert body["count"] == 1 assert body["audit"][0]["actor"] == "alice" assert body["actor"] == "alice" def test_csv_route_returns_text_csv(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit.csv", headers={"Authorization": f"Bearer {keys['admin']}"}) assert r.status_code == 200 assert r.headers["content-type"].startswith("text/csv") def test_csv_route_attachment_disposition_carries_scope(tmp_path): """No filters → filename has 'all'; actor filter → filename has 'actor-alice' so two parallel exports don't collide.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit.csv?actor=alice", headers={"Authorization": f"Bearer {keys['admin']}"}) cd = r.headers["content-disposition"] assert "attachment" in cd assert "actor-alice" in cd def test_csv_route_no_filter_scope_says_all(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit.csv", headers={"Authorization": f"Bearer {keys['admin']}"}) assert "all" in r.headers["content-disposition"] def test_csv_route_401_when_admin_configured_and_no_key(tmp_path): dbfile, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit.csv") assert r.status_code == 401 def test_csv_route_body_parses_and_carries_filtered_rows(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/audit.csv?actor=alice", headers={"Authorization": f"Bearer {keys['admin']}"}) rows = list(csv.reader(io.StringIO(r.text))) body = rows[1:] assert len(body) == 1 assert body[0][2] == "alice" # actor column