File size: 8,655 Bytes
d2d1903 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | """
Tests for Stage 48 β GET /runs/{rid}/decisions.csv.
Sibling of the Stage 47 issues CSV β same pattern, same audience
(management, who wants 'what to do' not 'what is broken'). One
addition over Stage 47: the CSV joins the decision snapshot
(title, urgency, recommendation) with the triage state (status,
owner, triage_notes), so a forwarded sheet is self-sufficient.
"""
import csv
import io
import json
import pytest
from infra import OrgStateService
from infra.api import handlers
def _insert_run_with_decisions(svc, tenant_id, run_id, decisions):
"""Insert a run + decisions. `decisions` is a list of dicts."""
svc.db.execute(
"""INSERT INTO runs
(run_id, tenant_id, entity_type, vertical, status,
n_states, n_issues, n_decisions, summary_json,
started_at, finished_at)
VALUES (?, ?, 'widget', 'logistics', 'completed',
0, 0, ?, '{}',
'2026-05-16T08:00:00+00:00',
'2026-05-16T08:00:00+00:00')""",
(run_id, tenant_id, len(decisions)),
)
for i, d in enumerate(decisions):
full = {
"decision_id": d.get("decision_id", f"d_{i}"),
"issue_id": d.get("issue_id", f"i_{i}"),
"entity_id": d["entity_id"],
"urgency": d["urgency"],
"title": d.get("title", "default title"),
"recommendation": d.get("recommendation", "do X"),
"confidence": d.get("confidence", 0.8),
"created_at": d.get("created_at", "2026-05-16"),
}
svc.db.execute(
"""INSERT INTO run_decisions
(run_id, decision_id, issue_id, tenant_id, entity_id,
urgency, recommendation, confidence, decision_json,
status, owner, triage_notes, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(run_id, full["decision_id"], full["issue_id"], tenant_id,
full["entity_id"], full["urgency"],
full["recommendation"], full["confidence"],
json.dumps(full),
d.get("status", "open"), d.get("owner"),
d.get("triage_notes"), d.get("updated_at")),
)
# --- pure handler -----------------------------------------------------
@pytest.fixture
def svc():
s = OrgStateService(":memory:")
yield s
s.close()
def test_handler_header_row(svc):
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [])
text = handlers.decisions_csv(svc, "r1")
rows = list(csv.reader(io.StringIO(text)))
assert rows[0] == [
"decision_id", "issue_id", "entity_id",
"urgency", "title", "recommendation", "confidence",
"status", "owner", "triage_notes",
"created_at", "updated_at",
]
def test_handler_body_sorted_by_urgency_desc(svc):
"""Most pressing decision at the top of the sheet β management's
eye lands on it first."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "low_pri", "urgency": 0.2},
{"entity_id": "high_pri", "urgency": 0.9},
{"entity_id": "med_pri", "urgency": 0.5},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
body = rows[1:]
assert [r[2] for r in body] == ["high_pri", "med_pri", "low_pri"]
def test_handler_combines_snapshot_with_triage(svc):
"""The headline addition over Stage 47's issues CSV: the row
carries both 'recommendation' (from the snapshot) AND
'status'/'owner'/'triage_notes' (from the triage queue).
A forwarded sheet is self-sufficient for a status review."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "wh_n", "urgency": 0.9,
"recommendation": "Audit replenishment in 7 days",
"status": "in_progress", "owner": "alice",
"triage_notes": "ordered investigation"},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
[row] = rows[1:]
by_col = dict(zip(rows[0], row))
assert by_col["entity_id"] == "wh_n"
assert by_col["recommendation"] == "Audit replenishment in 7 days"
assert by_col["status"] == "in_progress"
assert by_col["owner"] == "alice"
assert by_col["triage_notes"] == "ordered investigation"
def test_handler_escapes_funky_recommendation(svc):
"""RFC 4180 escaping must round-trip through csv.reader even
when recommendations contain commas/quotes/newlines β operations
text often does."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "a", "urgency": 0.7,
"recommendation": 'do X, then "verify" with\nthe team'},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
[row] = rows[1:]
assert row[5] == 'do X, then "verify" with\nthe team'
def test_handler_unknown_run_raises_404(svc):
from infra.api.errors import ApiError
with pytest.raises(ApiError):
handlers.decisions_csv(svc, "r_ghost")
def test_handler_empty_run_returns_only_header(svc):
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
assert len(rows) == 1
def test_handler_treats_null_triage_fields_as_empty(svc):
"""A fresh decision has status='open' but owner=NULL and
triage_notes=NULL. Those NULLs render as empty strings in the
CSV (not the literal "None" or "null"), so Excel cells are blank."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "a", "urgency": 0.5},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
by_col = dict(zip(rows[0], rows[1]))
assert by_col["owner"] == ""
assert by_col["triage_notes"] == ""
# --- HTTP route smoke -------------------------------------------------
def _client(dbfile):
pytest.importorskip("fastapi")
pytest.importorskip("httpx")
from fastapi.testclient import TestClient
from infra.api import create_app
return TestClient(create_app(dbfile))
def _bootstrap(tmp_path):
dbfile = str(tmp_path / "dcsv.sqlite3")
svc = OrgStateService(dbfile)
try:
svc.register_tenant("acme", "ACME")
svc.register_tenant("globex", "Globex")
_insert_run_with_decisions(svc, "acme", "r_acme", [
{"entity_id": "wh", "urgency": 0.9,
"recommendation": "Audit X",
"status": "open"},
])
_insert_run_with_decisions(svc, "globex", "r_globex", [])
keys = {
"acme": svc.create_api_key("acme", role="readonly").raw,
"globex": svc.create_api_key("globex", role="readonly").raw,
}
finally:
svc.close()
return dbfile, keys
def test_csv_route_returns_text_csv(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_acme/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
assert r.status_code == 200
assert r.headers["content-type"].startswith("text/csv")
assert "utf-8" in r.headers["content-type"]
def test_csv_route_attachment_disposition_has_run_id(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_acme/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
cd = r.headers.get("content-disposition", "")
assert "attachment" in cd
assert "r_acme" in cd
assert "decisions" in cd # so two attachments don't collide on disk
def test_csv_route_no_key_401(tmp_path):
dbfile, _ = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_acme/decisions.csv")
assert r.status_code == 401
def test_csv_route_cross_tenant_404(tmp_path):
"""Same require_run_access pattern as the JSON variant β 404 not
403 on cross-tenant probe."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_globex/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
assert r.status_code == 404
def test_csv_route_unknown_run_404(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_ghost/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
assert r.status_code == 404
|