File size: 6,263 Bytes
d2d1903 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | """
Tests for Stage 56 β POST /admin/schedules/tick.
Stage 39's REST CRUD covers list/get/create/enable/disable per
schedule. Stage 36's CLI had tick (run every due schedule once).
Stage 56 brings tick to HTTP so an admin dashboard's "run now"
button works without ssh-ing in or waiting for cron.
Payload parity with the CLI is the central guarantee.
"""
import pytest
pytest.importorskip("fastapi")
pytest.importorskip("httpx")
pytest.importorskip("yaml")
from fastapi.testclient import TestClient
from infra import OrgStateService
from infra.api import create_app
def _client(dbfile):
return TestClient(create_app(dbfile))
def _auth(k):
return {"Authorization": f"Bearer {k}"}
def _make_schedule(svc, tenant_id="acme",
connector_config=None):
if connector_config is None:
connector_config = {"vertical": "logistics"}
from infra.ingestion.scheduler import IngestionService
return IngestionService(svc).register_schedule(
tenant_id=tenant_id, vertical="logistics",
entity_type="warehouse", connector_type="csv_folder",
connector_config=connector_config,
)
def _bootstrap(tmp_path, *, calibrated=False, bad_config=False):
dbfile = str(tmp_path / "admin_tick.sqlite3")
svc = OrgStateService(dbfile)
try:
svc.register_tenant("acme", "ACME")
if calibrated:
from verticals import get_vertical_config, get_vertical_observation_loader
wh_obs = get_vertical_observation_loader(
"logistics")()["warehouse"]
cfg = get_vertical_config(
"logistics").entity_type("warehouse")
svc.calibrate_and_store("acme", wh_obs, cfg,
vertical="logistics")
cfg = ({"vertical": "logistics", "data_dir": "/nope/nowhere"}
if bad_config else None)
_make_schedule(svc, connector_config=cfg)
keys = {
"admin": svc.create_admin_key().raw,
"acme_admin": svc.create_api_key("acme",
role="admin").raw,
}
finally:
svc.close()
return dbfile, keys
# --- happy paths -----------------------------------------------------
def test_tick_runs_due_schedule_returns_ok(tmp_path):
"""Calibrated tenant + fresh schedule (never ticked) β POST
runs it, status=ok, run_id in the result."""
dbfile, keys = _bootstrap(tmp_path, calibrated=True)
client = _client(dbfile)
r = client.post("/admin/schedules/tick",
headers=_auth(keys["admin"]))
assert r.status_code == 200
body = r.json()
assert body["n_ran"] == 1
assert body["n_ok"] == 1
assert body["n_skipped"] == 0
assert body["n_errors"] == 0
[res] = body["results"]
assert res["status"] == "ok"
assert res["run_id"].startswith("run_")
def test_tick_no_due_schedules_returns_empty(tmp_path):
"""Empty platform β all counts zero, results []. Same as the CLI
when nothing is due."""
dbfile = str(tmp_path / "empty.sqlite3")
svc = OrgStateService(dbfile)
try:
admin = svc.create_admin_key().raw
finally:
svc.close()
client = _client(dbfile)
r = client.post("/admin/schedules/tick",
headers=_auth(admin))
assert r.status_code == 200
body = r.json()
assert body == {"n_ran": 0, "n_ok": 0, "n_skipped": 0,
"n_errors": 0, "results": []}
def test_tick_skipped_when_no_calibration(tmp_path):
"""Schedule on uncalibrated tenant β status='skipped' with the
"no calibration" reason, NOT 'error'. Same semantics as CLI tick
(Stage 36): onboarding state, don't page."""
dbfile, keys = _bootstrap(tmp_path, calibrated=False)
client = _client(dbfile)
r = client.post("/admin/schedules/tick",
headers=_auth(keys["admin"]))
assert r.status_code == 200
body = r.json()
assert body["n_skipped"] == 1
assert body["n_errors"] == 0
assert "calibration" in body["results"][0]["reason"].lower()
def test_tick_error_isolated_to_one_schedule(tmp_path):
"""A schedule with bad data_dir errors at fetch time. The whole
tick must NOT crash β status='error' on that one, others
continue. The CLI's Stage 36 crash isolation guarantee, now
over HTTP."""
dbfile, keys = _bootstrap(tmp_path, calibrated=True,
bad_config=True)
client = _client(dbfile)
r = client.post("/admin/schedules/tick",
headers=_auth(keys["admin"]))
# endpoint itself returns 200 even when schedules error β
# tick reports per-schedule outcomes, doesn't propagate
assert r.status_code == 200
body = r.json()
assert body["n_errors"] == 1
[res] = body["results"]
assert res["status"] == "error"
assert res["reason"]
# --- auth ------------------------------------------------------------
def test_tick_no_admin_key_401(tmp_path):
"""Once admin auth is configured, anonymous β 401."""
dbfile, _ = _bootstrap(tmp_path, calibrated=True)
client = _client(dbfile)
r = client.post("/admin/schedules/tick")
assert r.status_code == 401
def test_tick_tenant_key_forbidden(tmp_path):
"""Tenant key, even role=admin, must NOT be able to tick. tick
runs schedules across every tenant β privilege the customer
must never have."""
dbfile, keys = _bootstrap(tmp_path, calibrated=True)
client = _client(dbfile)
r = client.post("/admin/schedules/tick",
headers=_auth(keys["acme_admin"]))
assert r.status_code in (401, 403)
# --- payload parity with CLI ----------------------------------------
def test_http_payload_keys_match_cli_payload_keys(tmp_path):
"""The HTTP body must have exactly the same top-level keys the
CLI prints β dashboards consuming one can swap to the other
without changing their parser."""
dbfile, keys = _bootstrap(tmp_path, calibrated=True)
client = _client(dbfile)
body = client.post("/admin/schedules/tick",
headers=_auth(keys["admin"])).json()
assert set(body.keys()) == {
"n_ran", "n_ok", "n_skipped", "n_errors", "results",
}
|