| """ |
| Tests for Stage 56 β POST /admin/schedules/tick. |
| |
| Stage 39's REST CRUD covers list/get/create/enable/disable per |
| schedule. Stage 36's CLI had tick (run every due schedule once). |
| Stage 56 brings tick to HTTP so an admin dashboard's "run now" |
| button works without ssh-ing in or waiting for cron. |
| |
| Payload parity with the CLI is the central guarantee. |
| """ |
| import pytest |
|
|
| pytest.importorskip("fastapi") |
| pytest.importorskip("httpx") |
| pytest.importorskip("yaml") |
| from fastapi.testclient import TestClient |
|
|
| from infra import OrgStateService |
| from infra.api import create_app |
|
|
|
|
| def _client(dbfile): |
| return TestClient(create_app(dbfile)) |
|
|
|
|
| def _auth(k): |
| return {"Authorization": f"Bearer {k}"} |
|
|
|
|
| def _make_schedule(svc, tenant_id="acme", |
| connector_config=None): |
| if connector_config is None: |
| connector_config = {"vertical": "logistics"} |
| from infra.ingestion.scheduler import IngestionService |
| return IngestionService(svc).register_schedule( |
| tenant_id=tenant_id, vertical="logistics", |
| entity_type="warehouse", connector_type="csv_folder", |
| connector_config=connector_config, |
| ) |
|
|
|
|
| def _bootstrap(tmp_path, *, calibrated=False, bad_config=False): |
| dbfile = str(tmp_path / "admin_tick.sqlite3") |
| svc = OrgStateService(dbfile) |
| try: |
| svc.register_tenant("acme", "ACME") |
| if calibrated: |
| from verticals import get_vertical_config, get_vertical_observation_loader |
| wh_obs = get_vertical_observation_loader( |
| "logistics")()["warehouse"] |
| cfg = get_vertical_config( |
| "logistics").entity_type("warehouse") |
| svc.calibrate_and_store("acme", wh_obs, cfg, |
| vertical="logistics") |
| cfg = ({"vertical": "logistics", "data_dir": "/nope/nowhere"} |
| if bad_config else None) |
| _make_schedule(svc, connector_config=cfg) |
| keys = { |
| "admin": svc.create_admin_key().raw, |
| "acme_admin": svc.create_api_key("acme", |
| role="admin").raw, |
| } |
| finally: |
| svc.close() |
| return dbfile, keys |
|
|
|
|
| |
|
|
| def test_tick_runs_due_schedule_returns_ok(tmp_path): |
| """Calibrated tenant + fresh schedule (never ticked) β POST |
| runs it, status=ok, run_id in the result.""" |
| dbfile, keys = _bootstrap(tmp_path, calibrated=True) |
| client = _client(dbfile) |
| r = client.post("/admin/schedules/tick", |
| headers=_auth(keys["admin"])) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["n_ran"] == 1 |
| assert body["n_ok"] == 1 |
| assert body["n_skipped"] == 0 |
| assert body["n_errors"] == 0 |
| [res] = body["results"] |
| assert res["status"] == "ok" |
| assert res["run_id"].startswith("run_") |
|
|
|
|
| def test_tick_no_due_schedules_returns_empty(tmp_path): |
| """Empty platform β all counts zero, results []. Same as the CLI |
| when nothing is due.""" |
| dbfile = str(tmp_path / "empty.sqlite3") |
| svc = OrgStateService(dbfile) |
| try: |
| admin = svc.create_admin_key().raw |
| finally: |
| svc.close() |
| client = _client(dbfile) |
| r = client.post("/admin/schedules/tick", |
| headers=_auth(admin)) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body == {"n_ran": 0, "n_ok": 0, "n_skipped": 0, |
| "n_errors": 0, "results": []} |
|
|
|
|
| def test_tick_skipped_when_no_calibration(tmp_path): |
| """Schedule on uncalibrated tenant β status='skipped' with the |
| "no calibration" reason, NOT 'error'. Same semantics as CLI tick |
| (Stage 36): onboarding state, don't page.""" |
| dbfile, keys = _bootstrap(tmp_path, calibrated=False) |
| client = _client(dbfile) |
| r = client.post("/admin/schedules/tick", |
| headers=_auth(keys["admin"])) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["n_skipped"] == 1 |
| assert body["n_errors"] == 0 |
| assert "calibration" in body["results"][0]["reason"].lower() |
|
|
|
|
| def test_tick_error_isolated_to_one_schedule(tmp_path): |
| """A schedule with bad data_dir errors at fetch time. The whole |
| tick must NOT crash β status='error' on that one, others |
| continue. The CLI's Stage 36 crash isolation guarantee, now |
| over HTTP.""" |
| dbfile, keys = _bootstrap(tmp_path, calibrated=True, |
| bad_config=True) |
| client = _client(dbfile) |
| r = client.post("/admin/schedules/tick", |
| headers=_auth(keys["admin"])) |
| |
| |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["n_errors"] == 1 |
| [res] = body["results"] |
| assert res["status"] == "error" |
| assert res["reason"] |
|
|
|
|
| |
|
|
| def test_tick_no_admin_key_401(tmp_path): |
| """Once admin auth is configured, anonymous β 401.""" |
| dbfile, _ = _bootstrap(tmp_path, calibrated=True) |
| client = _client(dbfile) |
| r = client.post("/admin/schedules/tick") |
| assert r.status_code == 401 |
|
|
|
|
| def test_tick_tenant_key_forbidden(tmp_path): |
| """Tenant key, even role=admin, must NOT be able to tick. tick |
| runs schedules across every tenant β privilege the customer |
| must never have.""" |
| dbfile, keys = _bootstrap(tmp_path, calibrated=True) |
| client = _client(dbfile) |
| r = client.post("/admin/schedules/tick", |
| headers=_auth(keys["acme_admin"])) |
| assert r.status_code in (401, 403) |
|
|
|
|
| |
|
|
| def test_http_payload_keys_match_cli_payload_keys(tmp_path): |
| """The HTTP body must have exactly the same top-level keys the |
| CLI prints β dashboards consuming one can swap to the other |
| without changing their parser.""" |
| dbfile, keys = _bootstrap(tmp_path, calibrated=True) |
| client = _client(dbfile) |
| body = client.post("/admin/schedules/tick", |
| headers=_auth(keys["admin"])).json() |
| assert set(body.keys()) == { |
| "n_ran", "n_ok", "n_skipped", "n_errors", "results", |
| } |
|
|