| """ |
| Tests for Stage 47 — GET /runs/{rid}/issues.csv. |
| |
| CSV download for non-technical consumers — finance/ops teams who |
| live in Excel. Two layers: |
| |
| - handlers.issues_csv — pure CSV generation with stdlib csv.writer |
| so RFC 4180 escaping (embedded quotes, commas, newlines in |
| titles) is handled correctly. |
| - HTTP route — same auth as the JSON variant; correct content-type; |
| Content-Disposition with a sensible filename. |
| """ |
| import csv |
| import io |
| import json |
|
|
| import pytest |
|
|
| from infra import OrgStateService |
| from infra.api import handlers |
|
|
|
|
| def _insert_run_with_issues(svc, tenant_id, run_id, issues): |
| """Insert a run + a list of issues directly. ``issues`` is a list |
| of dicts with at least entity_id/severity/score/title.""" |
| svc.db.execute( |
| """INSERT INTO runs |
| (run_id, tenant_id, entity_type, vertical, status, |
| n_states, n_issues, n_decisions, summary_json, |
| started_at, finished_at) |
| VALUES (?, ?, 'widget', 'logistics', 'completed', |
| 0, ?, 0, '{}', |
| '2026-05-16T08:00:00+00:00', |
| '2026-05-16T08:00:00+00:00')""", |
| (run_id, tenant_id, len(issues)), |
| ) |
| for i, issue in enumerate(issues): |
| full = { |
| "issue_id": issue.get("issue_id", f"i_{i}"), |
| "entity_id": issue["entity_id"], |
| "entity_type": issue.get("entity_type", "widget"), |
| "severity": issue["severity"], |
| "score": issue["score"], |
| "title": issue.get("title", "default title"), |
| "detected_at": issue.get("detected_at", "2026-05-16"), |
| } |
| svc.db.execute( |
| """INSERT INTO run_issues |
| (run_id, issue_id, tenant_id, entity_id, entity_type, |
| severity, score, detected_at, issue_json) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", |
| (run_id, full["issue_id"], tenant_id, full["entity_id"], |
| full["entity_type"], full["severity"], full["score"], |
| full["detected_at"], json.dumps(full)), |
| ) |
|
|
|
|
| |
|
|
| @pytest.fixture |
| def svc(): |
| s = OrgStateService(":memory:") |
| yield s |
| s.close() |
|
|
|
|
| def test_handler_emits_header_row(svc): |
| svc.register_tenant("acme", "ACME") |
| _insert_run_with_issues(svc, "acme", "r1", []) |
| text = handlers.issues_csv(svc, "r1") |
| rows = list(csv.reader(io.StringIO(text))) |
| assert rows[0] == ["issue_id", "entity_id", "entity_type", |
| "severity", "score", "title", "detected_at"] |
|
|
|
|
| def test_handler_emits_body_rows_in_score_order(svc): |
| """get_run_issues already sorts by score desc; the CSV preserves |
| that — worst row at the top of the sheet.""" |
| svc.register_tenant("acme", "ACME") |
| _insert_run_with_issues(svc, "acme", "r1", [ |
| {"entity_id": "low", "severity": "low", "score": 0.1, |
| "title": "minor"}, |
| {"entity_id": "crit", "severity": "critical", "score": 0.9, |
| "title": "very bad"}, |
| {"entity_id": "med", "severity": "medium", "score": 0.5, |
| "title": "ok"}, |
| ]) |
| rows = list(csv.reader(io.StringIO(handlers.issues_csv(svc, "r1")))) |
| |
| body = rows[1:] |
| assert [r[1] for r in body] == ["crit", "med", "low"] |
|
|
|
|
| def test_handler_escapes_funky_titles(svc): |
| """RFC 4180: embedded commas, quotes, and newlines must round-trip |
| through csv.reader without breaking the row structure.""" |
| svc.register_tenant("acme", "ACME") |
| _insert_run_with_issues(svc, "acme", "r1", [ |
| {"entity_id": "a", "severity": "high", "score": 0.7, |
| "title": 'title with, comma and "quote" and\nnewline'}, |
| ]) |
| text = handlers.issues_csv(svc, "r1") |
| rows = list(csv.reader(io.StringIO(text))) |
| assert len(rows) == 2 |
| assert rows[1][5] == 'title with, comma and "quote" and\nnewline' |
|
|
|
|
| def test_handler_unknown_run_raises_404(svc): |
| from infra.api.errors import ApiError |
| with pytest.raises(ApiError): |
| handlers.issues_csv(svc, "r_ghost") |
|
|
|
|
| def test_handler_empty_run_returns_only_header(svc): |
| svc.register_tenant("acme", "ACME") |
| _insert_run_with_issues(svc, "acme", "r1", []) |
| rows = list(csv.reader(io.StringIO(handlers.issues_csv(svc, "r1")))) |
| assert len(rows) == 1 |
|
|
|
|
| |
|
|
| def _client(dbfile=":memory:"): |
| pytest.importorskip("fastapi") |
| pytest.importorskip("httpx") |
| from fastapi.testclient import TestClient |
|
|
| from infra.api import create_app |
| return TestClient(create_app(dbfile)) |
|
|
|
|
| def _bootstrap(tmp_path): |
| dbfile = str(tmp_path / "csv.sqlite3") |
| svc = OrgStateService(dbfile) |
| try: |
| svc.register_tenant("acme", "ACME") |
| svc.register_tenant("globex", "Globex") |
| _insert_run_with_issues(svc, "acme", "r_acme", [ |
| {"entity_id": "wh_north", "severity": "critical", |
| "score": 0.91, "title": "drift severe"}, |
| ]) |
| _insert_run_with_issues(svc, "globex", "r_globex", []) |
| keys = { |
| "acme_ro": svc.create_api_key("acme", role="readonly").raw, |
| "globex_ro": svc.create_api_key("globex", role="readonly").raw, |
| } |
| finally: |
| svc.close() |
| return dbfile, keys |
|
|
|
|
| def test_csv_route_returns_text_csv_content_type(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/runs/r_acme/issues.csv", |
| headers={"Authorization": f"Bearer {keys['acme_ro']}"}) |
| assert r.status_code == 200 |
| |
| assert r.headers["content-type"].startswith("text/csv") |
| assert "utf-8" in r.headers["content-type"] |
|
|
|
|
| def test_csv_route_includes_content_disposition_attachment(tmp_path): |
| """Browsers download instead of inline-display when the header |
| includes 'attachment; filename=...'. Excel + Sheets respect this.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/runs/r_acme/issues.csv", |
| headers={"Authorization": f"Bearer {keys['acme_ro']}"}) |
| cd = r.headers.get("content-disposition", "") |
| assert "attachment" in cd |
| assert "r_acme" in cd |
|
|
|
|
| def test_csv_route_body_parses_as_csv(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/runs/r_acme/issues.csv", |
| headers={"Authorization": f"Bearer {keys['acme_ro']}"}) |
| rows = list(csv.reader(io.StringIO(r.text))) |
| assert rows[0][0] == "issue_id" |
| assert rows[1][1] == "wh_north" |
| assert rows[1][5] == "drift severe" |
|
|
|
|
| def test_csv_route_no_key_returns_401(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/runs/r_acme/issues.csv") |
| assert r.status_code == 401 |
|
|
|
|
| def test_csv_route_cross_tenant_returns_404(tmp_path): |
| """Cross-tenant access goes through require_run_access, which |
| intentionally returns 404 (not 403) for runs on another tenant — |
| so existence isn't disclosed via the distinction.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/runs/r_globex/issues.csv", |
| headers={"Authorization": f"Bearer {keys['acme_ro']}"}) |
| assert r.status_code == 404 |
|
|
|
|
| def test_csv_route_unknown_run_returns_404(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/runs/r_ghost/issues.csv", |
| headers={"Authorization": f"Bearer {keys['acme_ro']}"}) |
| assert r.status_code == 404 |
|
|