File size: 7,768 Bytes
d2d1903
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
Tests for Stage 47 β€” GET /runs/{rid}/issues.csv.

CSV download for non-technical consumers β€” finance/ops teams who
live in Excel. Two layers:

  - handlers.issues_csv β€” pure CSV generation with stdlib csv.writer
    so RFC 4180 escaping (embedded quotes, commas, newlines in
    titles) is handled correctly.
  - HTTP route β€” same auth as the JSON variant; correct content-type;
    Content-Disposition with a sensible filename.
"""
import csv
import io
import json

import pytest

from infra import OrgStateService
from infra.api import handlers


def _insert_run_with_issues(svc, tenant_id, run_id, issues):
    """Insert a run + a list of issues directly. ``issues`` is a list
    of dicts with at least entity_id/severity/score/title."""
    svc.db.execute(
        """INSERT INTO runs
               (run_id, tenant_id, entity_type, vertical, status,
                n_states, n_issues, n_decisions, summary_json,
                started_at, finished_at)
           VALUES (?, ?, 'widget', 'logistics', 'completed',
                   0, ?, 0, '{}',
                   '2026-05-16T08:00:00+00:00',
                   '2026-05-16T08:00:00+00:00')""",
        (run_id, tenant_id, len(issues)),
    )
    for i, issue in enumerate(issues):
        full = {
            "issue_id": issue.get("issue_id", f"i_{i}"),
            "entity_id": issue["entity_id"],
            "entity_type": issue.get("entity_type", "widget"),
            "severity": issue["severity"],
            "score": issue["score"],
            "title": issue.get("title", "default title"),
            "detected_at": issue.get("detected_at", "2026-05-16"),
        }
        svc.db.execute(
            """INSERT INTO run_issues
                   (run_id, issue_id, tenant_id, entity_id, entity_type,
                    severity, score, detected_at, issue_json)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (run_id, full["issue_id"], tenant_id, full["entity_id"],
             full["entity_type"], full["severity"], full["score"],
             full["detected_at"], json.dumps(full)),
        )


# --- pure handler -----------------------------------------------------

@pytest.fixture
def svc():
    s = OrgStateService(":memory:")
    yield s
    s.close()


def test_handler_emits_header_row(svc):
    svc.register_tenant("acme", "ACME")
    _insert_run_with_issues(svc, "acme", "r1", [])
    text = handlers.issues_csv(svc, "r1")
    rows = list(csv.reader(io.StringIO(text)))
    assert rows[0] == ["issue_id", "entity_id", "entity_type",
                        "severity", "score", "title", "detected_at"]


def test_handler_emits_body_rows_in_score_order(svc):
    """get_run_issues already sorts by score desc; the CSV preserves
    that β€” worst row at the top of the sheet."""
    svc.register_tenant("acme", "ACME")
    _insert_run_with_issues(svc, "acme", "r1", [
        {"entity_id": "low", "severity": "low", "score": 0.1,
         "title": "minor"},
        {"entity_id": "crit", "severity": "critical", "score": 0.9,
         "title": "very bad"},
        {"entity_id": "med", "severity": "medium", "score": 0.5,
         "title": "ok"},
    ])
    rows = list(csv.reader(io.StringIO(handlers.issues_csv(svc, "r1"))))
    # row 0 is header; rows 1..N are body in score-desc order
    body = rows[1:]
    assert [r[1] for r in body] == ["crit", "med", "low"]


def test_handler_escapes_funky_titles(svc):
    """RFC 4180: embedded commas, quotes, and newlines must round-trip
    through csv.reader without breaking the row structure."""
    svc.register_tenant("acme", "ACME")
    _insert_run_with_issues(svc, "acme", "r1", [
        {"entity_id": "a", "severity": "high", "score": 0.7,
         "title": 'title with, comma and "quote" and\nnewline'},
    ])
    text = handlers.issues_csv(svc, "r1")
    rows = list(csv.reader(io.StringIO(text)))
    assert len(rows) == 2     # header + one body row
    assert rows[1][5] == 'title with, comma and "quote" and\nnewline'


def test_handler_unknown_run_raises_404(svc):
    from infra.api.errors import ApiError
    with pytest.raises(ApiError):
        handlers.issues_csv(svc, "r_ghost")


def test_handler_empty_run_returns_only_header(svc):
    svc.register_tenant("acme", "ACME")
    _insert_run_with_issues(svc, "acme", "r1", [])
    rows = list(csv.reader(io.StringIO(handlers.issues_csv(svc, "r1"))))
    assert len(rows) == 1     # only the header


# --- HTTP route smoke -------------------------------------------------

def _client(dbfile=":memory:"):
    pytest.importorskip("fastapi")
    pytest.importorskip("httpx")
    from fastapi.testclient import TestClient

    from infra.api import create_app
    return TestClient(create_app(dbfile))


def _bootstrap(tmp_path):
    dbfile = str(tmp_path / "csv.sqlite3")
    svc = OrgStateService(dbfile)
    try:
        svc.register_tenant("acme", "ACME")
        svc.register_tenant("globex", "Globex")
        _insert_run_with_issues(svc, "acme", "r_acme", [
            {"entity_id": "wh_north", "severity": "critical",
             "score": 0.91, "title": "drift severe"},
        ])
        _insert_run_with_issues(svc, "globex", "r_globex", [])
        keys = {
            "acme_ro": svc.create_api_key("acme", role="readonly").raw,
            "globex_ro": svc.create_api_key("globex", role="readonly").raw,
        }
    finally:
        svc.close()
    return dbfile, keys


def test_csv_route_returns_text_csv_content_type(tmp_path):
    dbfile, keys = _bootstrap(tmp_path)
    client = _client(dbfile)
    r = client.get("/runs/r_acme/issues.csv",
                    headers={"Authorization": f"Bearer {keys['acme_ro']}"})
    assert r.status_code == 200
    # text/csv is the canonical type; charset utf-8 explicit
    assert r.headers["content-type"].startswith("text/csv")
    assert "utf-8" in r.headers["content-type"]


def test_csv_route_includes_content_disposition_attachment(tmp_path):
    """Browsers download instead of inline-display when the header
    includes 'attachment; filename=...'. Excel + Sheets respect this."""
    dbfile, keys = _bootstrap(tmp_path)
    client = _client(dbfile)
    r = client.get("/runs/r_acme/issues.csv",
                    headers={"Authorization": f"Bearer {keys['acme_ro']}"})
    cd = r.headers.get("content-disposition", "")
    assert "attachment" in cd
    assert "r_acme" in cd  # filename carries the run id


def test_csv_route_body_parses_as_csv(tmp_path):
    dbfile, keys = _bootstrap(tmp_path)
    client = _client(dbfile)
    r = client.get("/runs/r_acme/issues.csv",
                    headers={"Authorization": f"Bearer {keys['acme_ro']}"})
    rows = list(csv.reader(io.StringIO(r.text)))
    assert rows[0][0] == "issue_id"
    assert rows[1][1] == "wh_north"
    assert rows[1][5] == "drift severe"


def test_csv_route_no_key_returns_401(tmp_path):
    dbfile, _ = _bootstrap(tmp_path)
    client = _client(dbfile)
    r = client.get("/runs/r_acme/issues.csv")
    assert r.status_code == 401


def test_csv_route_cross_tenant_returns_404(tmp_path):
    """Cross-tenant access goes through require_run_access, which
    intentionally returns 404 (not 403) for runs on another tenant β€”
    so existence isn't disclosed via the distinction."""
    dbfile, keys = _bootstrap(tmp_path)
    client = _client(dbfile)
    r = client.get("/runs/r_globex/issues.csv",
                    headers={"Authorization": f"Bearer {keys['acme_ro']}"})
    assert r.status_code == 404


def test_csv_route_unknown_run_returns_404(tmp_path):
    dbfile, keys = _bootstrap(tmp_path)
    client = _client(dbfile)
    r = client.get("/runs/r_ghost/issues.csv",
                    headers={"Authorization": f"Bearer {keys['acme_ro']}"})
    assert r.status_code == 404