""" Tests for Stage 48 — GET /runs/{rid}/decisions.csv. Sibling of the Stage 47 issues CSV — same pattern, same audience (management, who wants 'what to do' not 'what is broken'). One addition over Stage 47: the CSV joins the decision snapshot (title, urgency, recommendation) with the triage state (status, owner, triage_notes), so a forwarded sheet is self-sufficient. """ import csv import io import json import pytest from infra import OrgStateService from infra.api import handlers def _insert_run_with_decisions(svc, tenant_id, run_id, decisions): """Insert a run + decisions. `decisions` is a list of dicts.""" svc.db.execute( """INSERT INTO runs (run_id, tenant_id, entity_type, vertical, status, n_states, n_issues, n_decisions, summary_json, started_at, finished_at) VALUES (?, ?, 'widget', 'logistics', 'completed', 0, 0, ?, '{}', '2026-05-16T08:00:00+00:00', '2026-05-16T08:00:00+00:00')""", (run_id, tenant_id, len(decisions)), ) for i, d in enumerate(decisions): full = { "decision_id": d.get("decision_id", f"d_{i}"), "issue_id": d.get("issue_id", f"i_{i}"), "entity_id": d["entity_id"], "urgency": d["urgency"], "title": d.get("title", "default title"), "recommendation": d.get("recommendation", "do X"), "confidence": d.get("confidence", 0.8), "created_at": d.get("created_at", "2026-05-16"), } svc.db.execute( """INSERT INTO run_decisions (run_id, decision_id, issue_id, tenant_id, entity_id, urgency, recommendation, confidence, decision_json, status, owner, triage_notes, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (run_id, full["decision_id"], full["issue_id"], tenant_id, full["entity_id"], full["urgency"], full["recommendation"], full["confidence"], json.dumps(full), d.get("status", "open"), d.get("owner"), d.get("triage_notes"), d.get("updated_at")), ) # --- pure handler ----------------------------------------------------- @pytest.fixture def svc(): s = OrgStateService(":memory:") yield s s.close() def test_handler_header_row(svc): svc.register_tenant("acme", "ACME") _insert_run_with_decisions(svc, "acme", "r1", []) text = handlers.decisions_csv(svc, "r1") rows = list(csv.reader(io.StringIO(text))) assert rows[0] == [ "decision_id", "issue_id", "entity_id", "urgency", "title", "recommendation", "confidence", "status", "owner", "triage_notes", "created_at", "updated_at", ] def test_handler_body_sorted_by_urgency_desc(svc): """Most pressing decision at the top of the sheet — management's eye lands on it first.""" svc.register_tenant("acme", "ACME") _insert_run_with_decisions(svc, "acme", "r1", [ {"entity_id": "low_pri", "urgency": 0.2}, {"entity_id": "high_pri", "urgency": 0.9}, {"entity_id": "med_pri", "urgency": 0.5}, ]) rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1")))) body = rows[1:] assert [r[2] for r in body] == ["high_pri", "med_pri", "low_pri"] def test_handler_combines_snapshot_with_triage(svc): """The headline addition over Stage 47's issues CSV: the row carries both 'recommendation' (from the snapshot) AND 'status'/'owner'/'triage_notes' (from the triage queue). A forwarded sheet is self-sufficient for a status review.""" svc.register_tenant("acme", "ACME") _insert_run_with_decisions(svc, "acme", "r1", [ {"entity_id": "wh_n", "urgency": 0.9, "recommendation": "Audit replenishment in 7 days", "status": "in_progress", "owner": "alice", "triage_notes": "ordered investigation"}, ]) rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1")))) [row] = rows[1:] by_col = dict(zip(rows[0], row)) assert by_col["entity_id"] == "wh_n" assert by_col["recommendation"] == "Audit replenishment in 7 days" assert by_col["status"] == "in_progress" assert by_col["owner"] == "alice" assert by_col["triage_notes"] == "ordered investigation" def test_handler_escapes_funky_recommendation(svc): """RFC 4180 escaping must round-trip through csv.reader even when recommendations contain commas/quotes/newlines — operations text often does.""" svc.register_tenant("acme", "ACME") _insert_run_with_decisions(svc, "acme", "r1", [ {"entity_id": "a", "urgency": 0.7, "recommendation": 'do X, then "verify" with\nthe team'}, ]) rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1")))) [row] = rows[1:] assert row[5] == 'do X, then "verify" with\nthe team' def test_handler_unknown_run_raises_404(svc): from infra.api.errors import ApiError with pytest.raises(ApiError): handlers.decisions_csv(svc, "r_ghost") def test_handler_empty_run_returns_only_header(svc): svc.register_tenant("acme", "ACME") _insert_run_with_decisions(svc, "acme", "r1", []) rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1")))) assert len(rows) == 1 def test_handler_treats_null_triage_fields_as_empty(svc): """A fresh decision has status='open' but owner=NULL and triage_notes=NULL. Those NULLs render as empty strings in the CSV (not the literal "None" or "null"), so Excel cells are blank.""" svc.register_tenant("acme", "ACME") _insert_run_with_decisions(svc, "acme", "r1", [ {"entity_id": "a", "urgency": 0.5}, ]) rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1")))) by_col = dict(zip(rows[0], rows[1])) assert by_col["owner"] == "" assert by_col["triage_notes"] == "" # --- HTTP route smoke ------------------------------------------------- def _client(dbfile): pytest.importorskip("fastapi") pytest.importorskip("httpx") from fastapi.testclient import TestClient from infra.api import create_app return TestClient(create_app(dbfile)) def _bootstrap(tmp_path): dbfile = str(tmp_path / "dcsv.sqlite3") svc = OrgStateService(dbfile) try: svc.register_tenant("acme", "ACME") svc.register_tenant("globex", "Globex") _insert_run_with_decisions(svc, "acme", "r_acme", [ {"entity_id": "wh", "urgency": 0.9, "recommendation": "Audit X", "status": "open"}, ]) _insert_run_with_decisions(svc, "globex", "r_globex", []) keys = { "acme": svc.create_api_key("acme", role="readonly").raw, "globex": svc.create_api_key("globex", role="readonly").raw, } finally: svc.close() return dbfile, keys def test_csv_route_returns_text_csv(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/runs/r_acme/decisions.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) assert r.status_code == 200 assert r.headers["content-type"].startswith("text/csv") assert "utf-8" in r.headers["content-type"] def test_csv_route_attachment_disposition_has_run_id(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/runs/r_acme/decisions.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) cd = r.headers.get("content-disposition", "") assert "attachment" in cd assert "r_acme" in cd assert "decisions" in cd # so two attachments don't collide on disk def test_csv_route_no_key_401(tmp_path): dbfile, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/runs/r_acme/decisions.csv") assert r.status_code == 401 def test_csv_route_cross_tenant_404(tmp_path): """Same require_run_access pattern as the JSON variant — 404 not 403 on cross-tenant probe.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/runs/r_globex/decisions.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) assert r.status_code == 404 def test_csv_route_unknown_run_404(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/runs/r_ghost/decisions.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) assert r.status_code == 404