orgstate / tests /test_infra_api_decisions_csv.py
Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
Tests for Stage 48 — GET /runs/{rid}/decisions.csv.
Sibling of the Stage 47 issues CSV — same pattern, same audience
(management, who wants 'what to do' not 'what is broken'). One
addition over Stage 47: the CSV joins the decision snapshot
(title, urgency, recommendation) with the triage state (status,
owner, triage_notes), so a forwarded sheet is self-sufficient.
"""
import csv
import io
import json
import pytest
from infra import OrgStateService
from infra.api import handlers
def _insert_run_with_decisions(svc, tenant_id, run_id, decisions):
"""Insert a run + decisions. `decisions` is a list of dicts."""
svc.db.execute(
"""INSERT INTO runs
(run_id, tenant_id, entity_type, vertical, status,
n_states, n_issues, n_decisions, summary_json,
started_at, finished_at)
VALUES (?, ?, 'widget', 'logistics', 'completed',
0, 0, ?, '{}',
'2026-05-16T08:00:00+00:00',
'2026-05-16T08:00:00+00:00')""",
(run_id, tenant_id, len(decisions)),
)
for i, d in enumerate(decisions):
full = {
"decision_id": d.get("decision_id", f"d_{i}"),
"issue_id": d.get("issue_id", f"i_{i}"),
"entity_id": d["entity_id"],
"urgency": d["urgency"],
"title": d.get("title", "default title"),
"recommendation": d.get("recommendation", "do X"),
"confidence": d.get("confidence", 0.8),
"created_at": d.get("created_at", "2026-05-16"),
}
svc.db.execute(
"""INSERT INTO run_decisions
(run_id, decision_id, issue_id, tenant_id, entity_id,
urgency, recommendation, confidence, decision_json,
status, owner, triage_notes, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(run_id, full["decision_id"], full["issue_id"], tenant_id,
full["entity_id"], full["urgency"],
full["recommendation"], full["confidence"],
json.dumps(full),
d.get("status", "open"), d.get("owner"),
d.get("triage_notes"), d.get("updated_at")),
)
# --- pure handler -----------------------------------------------------
@pytest.fixture
def svc():
s = OrgStateService(":memory:")
yield s
s.close()
def test_handler_header_row(svc):
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [])
text = handlers.decisions_csv(svc, "r1")
rows = list(csv.reader(io.StringIO(text)))
assert rows[0] == [
"decision_id", "issue_id", "entity_id",
"urgency", "title", "recommendation", "confidence",
"status", "owner", "triage_notes",
"created_at", "updated_at",
]
def test_handler_body_sorted_by_urgency_desc(svc):
"""Most pressing decision at the top of the sheet — management's
eye lands on it first."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "low_pri", "urgency": 0.2},
{"entity_id": "high_pri", "urgency": 0.9},
{"entity_id": "med_pri", "urgency": 0.5},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
body = rows[1:]
assert [r[2] for r in body] == ["high_pri", "med_pri", "low_pri"]
def test_handler_combines_snapshot_with_triage(svc):
"""The headline addition over Stage 47's issues CSV: the row
carries both 'recommendation' (from the snapshot) AND
'status'/'owner'/'triage_notes' (from the triage queue).
A forwarded sheet is self-sufficient for a status review."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "wh_n", "urgency": 0.9,
"recommendation": "Audit replenishment in 7 days",
"status": "in_progress", "owner": "alice",
"triage_notes": "ordered investigation"},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
[row] = rows[1:]
by_col = dict(zip(rows[0], row))
assert by_col["entity_id"] == "wh_n"
assert by_col["recommendation"] == "Audit replenishment in 7 days"
assert by_col["status"] == "in_progress"
assert by_col["owner"] == "alice"
assert by_col["triage_notes"] == "ordered investigation"
def test_handler_escapes_funky_recommendation(svc):
"""RFC 4180 escaping must round-trip through csv.reader even
when recommendations contain commas/quotes/newlines — operations
text often does."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "a", "urgency": 0.7,
"recommendation": 'do X, then "verify" with\nthe team'},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
[row] = rows[1:]
assert row[5] == 'do X, then "verify" with\nthe team'
def test_handler_unknown_run_raises_404(svc):
from infra.api.errors import ApiError
with pytest.raises(ApiError):
handlers.decisions_csv(svc, "r_ghost")
def test_handler_empty_run_returns_only_header(svc):
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
assert len(rows) == 1
def test_handler_treats_null_triage_fields_as_empty(svc):
"""A fresh decision has status='open' but owner=NULL and
triage_notes=NULL. Those NULLs render as empty strings in the
CSV (not the literal "None" or "null"), so Excel cells are blank."""
svc.register_tenant("acme", "ACME")
_insert_run_with_decisions(svc, "acme", "r1", [
{"entity_id": "a", "urgency": 0.5},
])
rows = list(csv.reader(io.StringIO(handlers.decisions_csv(svc, "r1"))))
by_col = dict(zip(rows[0], rows[1]))
assert by_col["owner"] == ""
assert by_col["triage_notes"] == ""
# --- HTTP route smoke -------------------------------------------------
def _client(dbfile):
pytest.importorskip("fastapi")
pytest.importorskip("httpx")
from fastapi.testclient import TestClient
from infra.api import create_app
return TestClient(create_app(dbfile))
def _bootstrap(tmp_path):
dbfile = str(tmp_path / "dcsv.sqlite3")
svc = OrgStateService(dbfile)
try:
svc.register_tenant("acme", "ACME")
svc.register_tenant("globex", "Globex")
_insert_run_with_decisions(svc, "acme", "r_acme", [
{"entity_id": "wh", "urgency": 0.9,
"recommendation": "Audit X",
"status": "open"},
])
_insert_run_with_decisions(svc, "globex", "r_globex", [])
keys = {
"acme": svc.create_api_key("acme", role="readonly").raw,
"globex": svc.create_api_key("globex", role="readonly").raw,
}
finally:
svc.close()
return dbfile, keys
def test_csv_route_returns_text_csv(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_acme/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
assert r.status_code == 200
assert r.headers["content-type"].startswith("text/csv")
assert "utf-8" in r.headers["content-type"]
def test_csv_route_attachment_disposition_has_run_id(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_acme/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
cd = r.headers.get("content-disposition", "")
assert "attachment" in cd
assert "r_acme" in cd
assert "decisions" in cd # so two attachments don't collide on disk
def test_csv_route_no_key_401(tmp_path):
dbfile, _ = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_acme/decisions.csv")
assert r.status_code == 401
def test_csv_route_cross_tenant_404(tmp_path):
"""Same require_run_access pattern as the JSON variant — 404 not
403 on cross-tenant probe."""
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_globex/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
assert r.status_code == 404
def test_csv_route_unknown_run_404(tmp_path):
dbfile, keys = _bootstrap(tmp_path)
client = _client(dbfile)
r = client.get("/runs/r_ghost/decisions.csv",
headers={"Authorization": f"Bearer {keys['acme']}"})
assert r.status_code == 404