| """ |
| Tests for Stage 67 — CSV variant of backfill. |
| |
| GET /tenants/{tid}/backfill.csv?entity_type&from&until&step_days |
| |
| Wide table, sorted as_of_day ASC so a chart in Excel reads |
| left-to-right. Each row carries per-cutoff counts + the single |
| top issue flattened to three columns (top_entity_id, top_score, |
| top_title) — Excel doesn't love nested arrays. |
| """ |
| import csv |
| import io |
|
|
| import pytest |
|
|
| pytest.importorskip("fastapi") |
| pytest.importorskip("httpx") |
| pytest.importorskip("yaml") |
| from fastapi.testclient import TestClient |
|
|
| from infra import OrgStateService |
| from infra.api import create_app, handlers |
| from orgstate_client import Client |
|
|
|
|
| def _seed_calibrated(dbfile, tenant_id="acme"): |
| from verticals import get_vertical_config, get_vertical_observation_loader |
| svc = OrgStateService(dbfile) |
| try: |
| svc.register_tenant(tenant_id, tenant_id, |
| vertical="logistics") |
| cfg = get_vertical_config("logistics").entity_type("warehouse") |
| obs = get_vertical_observation_loader( |
| "logistics")()["warehouse"] |
| svc.ingest_observations(tenant_id, "warehouse", [ |
| {"entity_id": o.entity_id, "day": o.day, "values": o.values} |
| for o in obs |
| ]) |
| svc.calibrate_and_store(tenant_id, obs, cfg, |
| vertical="logistics") |
| finally: |
| svc.close() |
|
|
|
|
| def _bootstrap(tmp_path): |
| dbfile = str(tmp_path / "bf_csv.sqlite3") |
| _seed_calibrated(dbfile, "acme") |
| _seed_calibrated(dbfile, "globex") |
| svc = OrgStateService(dbfile) |
| try: |
| keys = { |
| "ro": svc.create_api_key("acme", role="readonly").raw, |
| "admin": svc.create_admin_key().raw, |
| } |
| finally: |
| svc.close() |
| return dbfile, keys |
|
|
|
|
| def _client(dbfile): |
| return TestClient(create_app(dbfile)) |
|
|
|
|
| def _auth(k): |
| return {"Authorization": f"Bearer {k}"} |
|
|
|
|
| |
|
|
| @pytest.fixture |
| def seeded_dbfile(tmp_path): |
| dbfile = str(tmp_path / "h.sqlite3") |
| _seed_calibrated(dbfile, "acme") |
| return dbfile |
|
|
|
|
| def test_handler_header_row(seeded_dbfile): |
| svc = OrgStateService(seeded_dbfile) |
| try: |
| text = handlers.backfill_csv(svc, "acme", "warehouse") |
| finally: |
| svc.close() |
| rows = list(csv.reader(io.StringIO(text))) |
| assert rows[0] == [ |
| "as_of_day", "n_states", "n_issues", "n_decisions", |
| "top_severity", |
| "top_entity_id", "top_score", "top_title", |
| ] |
|
|
|
|
| def test_handler_body_sorted_by_date_ascending(seeded_dbfile): |
| """Chart-friendly order — Excel x-axis left-to-right is past-to- |
| present.""" |
| svc = OrgStateService(seeded_dbfile) |
| try: |
| text = handlers.backfill_csv(svc, "acme", "warehouse") |
| finally: |
| svc.close() |
| rows = list(csv.reader(io.StringIO(text))) |
| body = rows[1:] |
| days = [r[0] for r in body] |
| assert days == sorted(days), ( |
| f"days must be sorted ascending; got {days}" |
| ) |
|
|
|
|
| def test_handler_top_issue_flattens_to_three_columns(seeded_dbfile): |
| """When a cutoff has issues, the top one gets flattened into |
| top_entity_id / top_score / top_title columns.""" |
| svc = OrgStateService(seeded_dbfile) |
| try: |
| text = handlers.backfill_csv(svc, "acme", "warehouse") |
| finally: |
| svc.close() |
| rows = list(csv.reader(io.StringIO(text))) |
| |
| with_issue = [r for r in rows[1:] if int(r[2]) > 0] |
| assert with_issue, "expected at least one cutoff with issues" |
| r = with_issue[0] |
| assert r[5] |
| assert r[6] |
| assert r[7] |
|
|
|
|
| def test_handler_empty_top_renders_as_blank_cells(seeded_dbfile): |
| """When a cutoff has zero issues, the top_* cells are blank |
| (NOT 'None' literal). Excel cells stay empty visually.""" |
| svc = OrgStateService(seeded_dbfile) |
| try: |
| text = handlers.backfill_csv(svc, "acme", "warehouse") |
| finally: |
| svc.close() |
| rows = list(csv.reader(io.StringIO(text))) |
| zero_issue = [r for r in rows[1:] if int(r[2]) == 0] |
| if not zero_issue: |
| pytest.skip("no zero-issue cutoffs in this seed") |
| r = zero_issue[0] |
| assert r[4] == "" |
| assert r[5] == "" |
| assert r[6] == "" |
| assert r[7] == "" |
|
|
|
|
| def test_handler_unknown_tenant_404(tmp_path): |
| svc = OrgStateService(str(tmp_path / "empty.sqlite3")) |
| try: |
| from infra.api.errors import ApiError |
| with pytest.raises(ApiError): |
| handlers.backfill_csv(svc, "ghost", "warehouse") |
| finally: |
| svc.close() |
|
|
|
|
| |
|
|
| def test_route_returns_text_csv(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get( |
| "/tenants/acme/backfill.csv?entity_type=warehouse", |
| headers=_auth(keys["ro"]), |
| ) |
| assert r.status_code == 200 |
| assert r.headers["content-type"].startswith("text/csv") |
| assert "utf-8" in r.headers["content-type"] |
|
|
|
|
| def test_route_attachment_filename_carries_tenant_and_entity_type(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get( |
| "/tenants/acme/backfill.csv?entity_type=warehouse", |
| headers=_auth(keys["ro"]), |
| ) |
| cd = r.headers["content-disposition"] |
| assert "attachment" in cd |
| assert "acme" in cd |
| assert "warehouse" in cd |
|
|
|
|
| def test_route_body_parses_as_csv(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| text = client.get( |
| "/tenants/acme/backfill.csv?entity_type=warehouse", |
| headers=_auth(keys["ro"]), |
| ).text |
| rows = list(csv.reader(io.StringIO(text))) |
| assert rows[0][0] == "as_of_day" |
| assert len(rows) > 1 |
|
|
|
|
| def test_route_filters_narrow_body(tmp_path): |
| """from/until/step_days actually limit the rows that come back.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| weekly = client.get( |
| "/tenants/acme/backfill.csv?entity_type=warehouse&step_days=7", |
| headers=_auth(keys["ro"]), |
| ).text |
| daily = client.get( |
| "/tenants/acme/backfill.csv?entity_type=warehouse", |
| headers=_auth(keys["ro"]), |
| ).text |
| assert len(weekly.splitlines()) < len(daily.splitlines()) |
|
|
|
|
| def test_route_no_DB_writes(tmp_path): |
| """Stage 65/66 contract — CSV variant honors it too.""" |
| dbfile, keys = _bootstrap(tmp_path) |
| svc = OrgStateService(dbfile) |
| try: |
| before = svc.db.query_one( |
| "SELECT COUNT(*) AS n FROM runs" |
| )["n"] |
| finally: |
| svc.close() |
| client = _client(dbfile) |
| client.get( |
| "/tenants/acme/backfill.csv?entity_type=warehouse", |
| headers=_auth(keys["ro"]), |
| ) |
| svc = OrgStateService(dbfile) |
| try: |
| after = svc.db.query_one( |
| "SELECT COUNT(*) AS n FROM runs" |
| )["n"] |
| finally: |
| svc.close() |
| assert before == after |
|
|
|
|
| def test_route_no_key_401(tmp_path): |
| dbfile, _ = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get( |
| "/tenants/acme/backfill.csv?entity_type=warehouse", |
| ) |
| assert r.status_code == 401 |
|
|
|
|
| def test_route_unknown_entity_type_404(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| client = _client(dbfile) |
| r = client.get( |
| "/tenants/acme/backfill.csv?entity_type=ghost", |
| headers=_auth(keys["ro"]), |
| ) |
| assert r.status_code == 404 |
|
|
|
|
| |
|
|
| def test_sdk_download_backfill_csv(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| c = Client(base_url="http://test", tenant_id="acme", |
| api_key=keys["ro"], |
| transport=_client(dbfile)) |
| text = c.download_backfill_csv("warehouse") |
| assert text.startswith("as_of_day,n_states,n_issues,") |
|
|
|
|
| def test_sdk_download_backfill_csv_passes_filters(tmp_path): |
| dbfile, keys = _bootstrap(tmp_path) |
| c = Client(base_url="http://test", tenant_id="acme", |
| api_key=keys["ro"], |
| transport=_client(dbfile)) |
| text = c.download_backfill_csv( |
| "warehouse", |
| from_day="2026-02-01", until_day="2026-02-15", |
| ) |
| rows = list(csv.reader(io.StringIO(text))) |
| for r in rows[1:]: |
| assert "2026-02-01" <= r[0] <= "2026-02-15" |
|
|