""" Tests for Stage 50 — GET /tenants/{tid}/runs.csv. Mirrors the JSON list_runs (Stage 49) but emits CSV. Covers filter parity (the same entity_type / since narrowing applies), auth (tenant readonly), content-type, attachment disposition with a filename that hints at the filter scope, and the hard row cap. """ import csv import io import pytest from infra import OrgStateService from infra.api import handlers def _insert_run(svc, tenant_id, run_id, entity_type, started_at): svc.db.execute( """INSERT INTO runs (run_id, tenant_id, entity_type, vertical, status, n_states, n_issues, n_decisions, summary_json, started_at, finished_at) VALUES (?, ?, ?, 'logistics', 'completed', 3, 1, 1, '{}', ?, ?)""", (run_id, tenant_id, entity_type, started_at, started_at), ) @pytest.fixture def svc(tmp_path): s = OrgStateService(str(tmp_path / "runs_csv.sqlite3")) s.register_tenant("acme", "ACME") _insert_run(s, "acme", "r_a", "warehouse", "2026-04-01T00:00:00+00:00") _insert_run(s, "acme", "r_b", "warehouse", "2026-05-10T00:00:00+00:00") _insert_run(s, "acme", "r_c", "team", "2026-05-12T00:00:00+00:00") yield s s.close() # --- pure handler ----------------------------------------------------- def test_handler_header_row(svc): rows = list(csv.reader(io.StringIO(handlers.runs_csv(svc, "acme")))) assert rows[0] == [ "run_id", "entity_type", "vertical", "status", "n_states", "n_issues", "n_decisions", "started_at", "finished_at", ] def test_handler_body_sorted_newest_first(svc): rows = list(csv.reader(io.StringIO(handlers.runs_csv(svc, "acme")))) body = rows[1:] assert [r[0] for r in body] == ["r_c", "r_b", "r_a"] def test_handler_entity_type_filter(svc): text = handlers.runs_csv(svc, "acme", entity_type="warehouse") rows = list(csv.reader(io.StringIO(text))) body = rows[1:] assert [r[0] for r in body] == ["r_b", "r_a"] def test_handler_since_filter_inclusive(svc): text = handlers.runs_csv(svc, "acme", since="2026-05-10T00:00:00+00:00") rows = list(csv.reader(io.StringIO(text))) body = rows[1:] assert {r[0] for r in body} == {"r_b", "r_c"} def test_handler_combined_filters(svc): text = handlers.runs_csv(svc, "acme", entity_type="warehouse", since="2026-05-01T00:00:00+00:00") rows = list(csv.reader(io.StringIO(text))) body = rows[1:] assert [r[0] for r in body] == ["r_b"] def test_handler_empty_tenant_returns_only_header(svc): svc.register_tenant("globex", "Globex") rows = list(csv.reader(io.StringIO(handlers.runs_csv(svc, "globex")))) assert len(rows) == 1 def test_handler_unknown_tenant_raises_404(svc): from infra.api.errors import ApiError with pytest.raises(ApiError): handlers.runs_csv(svc, "ghost") # --- HTTP route ------------------------------------------------------- def _client(dbfile): pytest.importorskip("fastapi") pytest.importorskip("httpx") from fastapi.testclient import TestClient from infra.api import create_app return TestClient(create_app(dbfile)) def _bootstrap(tmp_path): dbfile = str(tmp_path / "http.sqlite3") boot = OrgStateService(dbfile) try: boot.register_tenant("acme", "ACME") boot.register_tenant("globex", "Globex") _insert_run(boot, "acme", "r_a", "warehouse", "2026-04-01T00:00:00+00:00") _insert_run(boot, "acme", "r_b", "warehouse", "2026-05-10T00:00:00+00:00") _insert_run(boot, "acme", "r_c", "team", "2026-05-12T00:00:00+00:00") keys = { "acme": boot.create_api_key("acme", role="readonly").raw, "globex": boot.create_api_key("globex", role="readonly").raw, } finally: boot.close() return dbfile, keys def test_route_returns_text_csv(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/acme/runs.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) assert r.status_code == 200 assert r.headers["content-type"].startswith("text/csv") assert "utf-8" in r.headers["content-type"] def test_route_attachment_filename_carries_tenant_and_scope(tmp_path): """The filename includes the tenant id and the scope so two parallel exports (warehouse vs team) don't collide in Downloads.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/acme/runs.csv?entity_type=warehouse", headers={"Authorization": f"Bearer {keys['acme']}"}) cd = r.headers.get("content-disposition", "") assert "attachment" in cd assert "acme" in cd assert "warehouse" in cd def test_route_no_filter_scope_says_all(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/acme/runs.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) cd = r.headers.get("content-disposition", "") assert "all" in cd def test_route_body_parses_as_csv(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/acme/runs.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) rows = list(csv.reader(io.StringIO(r.text))) assert rows[0][0] == "run_id" assert len(rows) == 4 # header + 3 runs def test_route_no_key_401(tmp_path): dbfile, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/acme/runs.csv") assert r.status_code == 401 def test_route_cross_tenant_403(tmp_path): dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/globex/runs.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) assert r.status_code == 403 def test_route_unknown_tenant_403(tmp_path): """Authenticated caller probing a non-existent tenant — 403 not 404, consistent with the rest of the per-tenant routes.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/ghost/runs.csv", headers={"Authorization": f"Bearer {keys['acme']}"}) assert r.status_code == 403 def test_route_filters_propagate_to_body(tmp_path): """The HTTP-level entity_type filter actually narrows the CSV output, not just echoes in some header.""" dbfile, keys = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/tenants/acme/runs.csv?entity_type=team", headers={"Authorization": f"Bearer {keys['acme']}"}) rows = list(csv.reader(io.StringIO(r.text))) body = rows[1:] assert len(body) == 1 assert body[0][0] == "r_c" assert body[0][1] == "team"