""" Tests for Stage 57 — GET /admin/schedules. Cross-tenant schedule list for admin dashboards. Distinct from Stage 39's /tenants/{tid}/schedules (per-tenant scope). Same filters (enabled_only, status, plus tenant_id as optional narrow), all admin-only. """ import pytest pytest.importorskip("fastapi") pytest.importorskip("httpx") pytest.importorskip("yaml") from fastapi.testclient import TestClient from infra import OrgStateService from infra.api import create_app def _client(dbfile): return TestClient(create_app(dbfile)) def _auth(k): return {"Authorization": f"Bearer {k}"} def _make_schedule(svc, tenant_id, connector_config=None): if connector_config is None: connector_config = {"vertical": "logistics"} from infra.ingestion.scheduler import IngestionService return IngestionService(svc).register_schedule( tenant_id=tenant_id, vertical="logistics", entity_type="warehouse", connector_type="csv_folder", connector_config=connector_config, ) def _bootstrap(tmp_path): """Two tenants, one schedule each. acme's is set to status='ok' via direct mark_run; globex's stays NULL last_status (never ticked).""" dbfile = str(tmp_path / "admin_sched.sqlite3") svc = OrgStateService(dbfile) try: svc.register_tenant("acme", "ACME") svc.register_tenant("globex", "Globex") s_acme = _make_schedule(svc, "acme") s_globex = _make_schedule(svc, "globex") # control last_status for the test from infra.storage import ScheduleRepository repo = ScheduleRepository(svc.db) repo.mark_run(s_acme["schedule_id"], "2026-05-16T08:00:00+00:00", status="ok") keys = { "admin": svc.create_admin_key().raw, "acme_admin": svc.create_api_key("acme", role="admin").raw, } finally: svc.close() return dbfile, keys, s_acme["schedule_id"], s_globex["schedule_id"] # --- happy paths ----------------------------------------------------- def test_returns_every_tenants_schedule(tmp_path): dbfile, keys, _, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/schedules", headers=_auth(keys["admin"])) assert r.status_code == 200 body = r.json() assert body["count"] == 2 tenants = {s["tenant_id"] for s in body["schedules"]} assert tenants == {"acme", "globex"} def test_status_filter_ok_returns_only_ticked_ok_rows(tmp_path): dbfile, keys, acme_sid, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/schedules?status=ok", headers=_auth(keys["admin"])) body = r.json() assert body["count"] == 1 assert body["schedules"][0]["schedule_id"] == acme_sid def test_status_filter_never_picks_unticked(tmp_path): """globex was never ticked (NULL last_status) — ?status=never surfaces exactly those onboarding-stage schedules.""" dbfile, keys, _, globex_sid = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/schedules?status=never", headers=_auth(keys["admin"])) body = r.json() assert body["count"] == 1 assert body["schedules"][0]["schedule_id"] == globex_sid def test_tenant_id_filter_narrows_to_one_tenant(tmp_path): """admin can scope the cross-tenant query to one tenant when debugging — useful for "what schedules does acme have?".""" dbfile, keys, acme_sid, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/schedules?tenant_id=acme", headers=_auth(keys["admin"])) body = r.json() assert body["count"] == 1 assert body["schedules"][0]["schedule_id"] == acme_sid # echo back so caller can audit what they asked assert body["tenant_id"] == "acme" def test_no_filters_omits_echo_keys(tmp_path): """Echo keys only appear when filters were used — keeps the no-filter payload tidy.""" dbfile, keys, _, _ = _bootstrap(tmp_path) client = _client(dbfile) body = client.get("/admin/schedules", headers=_auth(keys["admin"])).json() assert "status" not in body assert "tenant_id" not in body assert "enabled_only" not in body # --- auth ------------------------------------------------------------ def test_no_admin_key_returns_401(tmp_path): """Once admin auth configured (admin key in DB) → 401 anonymous.""" dbfile, _, _, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/schedules") assert r.status_code == 401 def test_tenant_key_forbidden(tmp_path): """Cross-tenant enumeration — tenant key admin-role must NOT be allowed. Otherwise customer could see other tenants' schedules.""" dbfile, keys, _, _ = _bootstrap(tmp_path) client = _client(dbfile) r = client.get("/admin/schedules", headers=_auth(keys["acme_admin"])) assert r.status_code in (401, 403) # --- combined filters ------------------------------------------------ def test_combined_filters_intersected(tmp_path): """tenant_id=acme + status=ok → exactly the one acme ok row. Same intersected-semantics as the Stage 39 list.""" dbfile, keys, acme_sid, _ = _bootstrap(tmp_path) client = _client(dbfile) body = client.get( "/admin/schedules?tenant_id=acme&status=ok", headers=_auth(keys["admin"]), ).json() assert body["count"] == 1 assert body["schedules"][0]["schedule_id"] == acme_sid body = client.get( "/admin/schedules?tenant_id=acme&status=never", headers=_auth(keys["admin"]), ).json() assert body["count"] == 0