| """ |
| Tests for Stage 50 — GET /tenants/{tid}/runs.csv. |
| |
| Mirrors the JSON list_runs (Stage 49) but emits CSV. Covers |
| filter parity (the same entity_type / since narrowing applies), |
| auth (tenant readonly), content-type, attachment disposition |
| with a filename that hints at the filter scope, and the hard |
| row cap. |
| """ |
| import csv |
| import io |
|
|
| import pytest |
|
|
| from infra import OrgStateService |
| from infra.api import handlers |
|
|
|
|
| def _insert_run(svc, tenant_id, run_id, entity_type, started_at): |
| svc.db.execute( |
| """INSERT INTO runs |
| (run_id, tenant_id, entity_type, vertical, status, |
| n_states, n_issues, n_decisions, summary_json, |
| started_at, finished_at) |
| VALUES (?, ?, ?, 'logistics', 'completed', |
| 3, 1, 1, '{}', ?, ?)""", |
| (run_id, tenant_id, entity_type, started_at, started_at), |
| ) |
|
|
|
|
| @pytest.fixture |
| def svc(tmp_path): |
| s = OrgStateService(str(tmp_path / "runs_csv.sqlite3")) |
| s.register_tenant("acme", "ACME") |
| _insert_run(s, "acme", "r_a", "warehouse", |
| "2026-04-01T00:00:00+00:00") |
| _insert_run(s, "acme", "r_b", "warehouse", |
| "2026-05-10T00:00:00+00:00") |
| _insert_run(s, "acme", "r_c", "team", |
| "2026-05-12T00:00:00+00:00") |
| yield s |
| s.close() |
|
|
|
|
| |
|
|
| def test_handler_header_row(svc): |
| rows = list(csv.reader(io.StringIO(handlers.runs_csv(svc, "acme")))) |
| assert rows[0] == [ |
| "run_id", "entity_type", "vertical", "status", |
| "n_states", "n_issues", "n_decisions", |
| "started_at", "finished_at", |
| ] |
|
|
|
|
| def test_handler_body_sorted_newest_first(svc): |
| rows = list(csv.reader(io.StringIO(handlers.runs_csv(svc, "acme")))) |
| body = rows[1:] |
| assert [r[0] for r in body] == ["r_c", "r_b", "r_a"] |
|
|
|
|
| def test_handler_entity_type_filter(svc): |
| text = handlers.runs_csv(svc, "acme", entity_type="warehouse") |
| rows = list(csv.reader(io.StringIO(text))) |
| body = rows[1:] |
| assert [r[0] for r in body] == ["r_b", "r_a"] |
|
|
|
|
| def test_handler_since_filter_inclusive(svc): |
| text = handlers.runs_csv(svc, "acme", |
| since="2026-05-10T00:00:00+00:00") |
| rows = list(csv.reader(io.StringIO(text))) |
| body = rows[1:] |
| assert {r[0] for r in body} == {"r_b", "r_c"} |
|
|
|
|
| def test_handler_combined_filters(svc): |
| text = handlers.runs_csv(svc, "acme", |
| entity_type="warehouse", |
| since="2026-05-01T00:00:00+00:00") |
| rows = list(csv.reader(io.StringIO(text))) |
| body = rows[1:] |
| assert [r[0] for r in body] == ["r_b"] |
|
|
|
|
| def test_handler_empty_tenant_returns_only_header(svc): |
| svc.register_tenant("globex", "Globex") |
| rows = list(csv.reader(io.StringIO(handlers.runs_csv(svc, "globex")))) |
| assert len(rows) == 1 |
|
|
|
|
| def test_handler_unknown_tenant_raises_404(svc): |
| from infra.api.errors import ApiError |
| with pytest.raises(ApiError): |
| handlers.runs_csv(svc, "ghost") |
|
|
|
|
| |
|
|
| def _client(dbfile): |
| pytest.importorskip("fastapi") |
| pytest.importorskip("httpx") |
| from fastapi.testclient import TestClient |
|
|
| from infra.api import create_app |
| return TestClient(create_app(dbfile)) |
|
|
|
|
| def _bootstrap(tmp_path): |
| dbfile = str(tmp_path / "http.sqlite3") |
| boot = OrgStateService(dbfile) |
| try: |
| boot.register_tenant("acme", "ACME") |
| boot.register_tenant("globex", "Globex") |
| _insert_run(boot, "acme", "r_a", "warehouse", |
| "2026-04-01T00:00:00+00:00") |
| _insert_run(boot, "acme", "r_b", "warehouse", |
| "2026-05-10T00:00:00+00:00") |
| _insert_run(boot, "acme", "r_c", "team", |
| "2026-05-12T00:00:00+00:00") |
| keys = { |
| "acme": boot.create_api_key("acme", role="readonly").raw, |
| "globex": boot.create_api_key("globex", role="readonly").raw, |
| } |
| finally: |
| boot.close() |
| return dbfile, keys |
|
|
|
|
| def test_route_returns_text_csv(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/acme/runs.csv", |
| headers={"Authorization": f"Bearer {keys['acme']}"}) |
| assert r.status_code == 200 |
| assert r.headers["content-type"].startswith("text/csv") |
| assert "utf-8" in r.headers["content-type"] |
|
|
|
|
| def test_route_attachment_filename_carries_tenant_and_scope(tmp_path): |
| """The filename includes the tenant id and the scope so two |
| parallel exports (warehouse vs team) don't collide in Downloads.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/acme/runs.csv?entity_type=warehouse", |
| headers={"Authorization": f"Bearer {keys['acme']}"}) |
| cd = r.headers.get("content-disposition", "") |
| assert "attachment" in cd |
| assert "acme" in cd |
| assert "warehouse" in cd |
|
|
|
|
| def test_route_no_filter_scope_says_all(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/acme/runs.csv", |
| headers={"Authorization": f"Bearer {keys['acme']}"}) |
| cd = r.headers.get("content-disposition", "") |
| assert "all" in cd |
|
|
|
|
| def test_route_body_parses_as_csv(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/acme/runs.csv", |
| headers={"Authorization": f"Bearer {keys['acme']}"}) |
| rows = list(csv.reader(io.StringIO(r.text))) |
| assert rows[0][0] == "run_id" |
| assert len(rows) == 4 |
|
|
|
|
| def test_route_no_key_401(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/acme/runs.csv") |
| assert r.status_code == 401 |
|
|
|
|
| def test_route_cross_tenant_403(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/globex/runs.csv", |
| headers={"Authorization": f"Bearer {keys['acme']}"}) |
| assert r.status_code == 403 |
|
|
|
|
| def test_route_unknown_tenant_403(tmp_path): |
| """Authenticated caller probing a non-existent tenant — 403 not |
| 404, consistent with the rest of the per-tenant routes.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/ghost/runs.csv", |
| headers={"Authorization": f"Bearer {keys['acme']}"}) |
| assert r.status_code == 403 |
|
|
|
|
| def test_route_filters_propagate_to_body(tmp_path): |
| """The HTTP-level entity_type filter actually narrows the CSV |
| output, not just echoes in some header.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get("/tenants/acme/runs.csv?entity_type=team", |
| headers={"Authorization": f"Bearer {keys['acme']}"}) |
| rows = list(csv.reader(io.StringIO(r.text))) |
| body = rows[1:] |
| assert len(body) == 1 |
| assert body[0][0] == "r_c" |
| assert body[0][1] == "team" |
|
|