File size: 6,263 Bytes
d2d1903
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
Tests for Stage 56 β€” POST /admin/schedules/tick.

Stage 39's REST CRUD covers list/get/create/enable/disable per
schedule. Stage 36's CLI had tick (run every due schedule once).
Stage 56 brings tick to HTTP so an admin dashboard's "run now"
button works without ssh-ing in or waiting for cron.

Payload parity with the CLI is the central guarantee.
"""
import pytest

pytest.importorskip("fastapi")
pytest.importorskip("httpx")
pytest.importorskip("yaml")
from fastapi.testclient import TestClient

from infra import OrgStateService
from infra.api import create_app


def _client(dbfile):
    return TestClient(create_app(dbfile))


def _auth(k):
    return {"Authorization": f"Bearer {k}"}


def _make_schedule(svc, tenant_id="acme",
                    connector_config=None):
    if connector_config is None:
        connector_config = {"vertical": "logistics"}
    from infra.ingestion.scheduler import IngestionService
    return IngestionService(svc).register_schedule(
        tenant_id=tenant_id, vertical="logistics",
        entity_type="warehouse", connector_type="csv_folder",
        connector_config=connector_config,
    )


def _bootstrap(tmp_path, *, calibrated=False, bad_config=False):
    dbfile = str(tmp_path / "admin_tick.sqlite3")
    svc = OrgStateService(dbfile)
    try:
        svc.register_tenant("acme", "ACME")
        if calibrated:
            from verticals import get_vertical_config, get_vertical_observation_loader
            wh_obs = get_vertical_observation_loader(
                "logistics")()["warehouse"]
            cfg = get_vertical_config(
                "logistics").entity_type("warehouse")
            svc.calibrate_and_store("acme", wh_obs, cfg,
                                     vertical="logistics")
        cfg = ({"vertical": "logistics", "data_dir": "/nope/nowhere"}
               if bad_config else None)
        _make_schedule(svc, connector_config=cfg)
        keys = {
            "admin": svc.create_admin_key().raw,
            "acme_admin": svc.create_api_key("acme",
                                              role="admin").raw,
        }
    finally:
        svc.close()
    return dbfile, keys


# --- happy paths -----------------------------------------------------

def test_tick_runs_due_schedule_returns_ok(tmp_path):
    """Calibrated tenant + fresh schedule (never ticked) β†’ POST
    runs it, status=ok, run_id in the result."""
    dbfile, keys = _bootstrap(tmp_path, calibrated=True)
    client = _client(dbfile)
    r = client.post("/admin/schedules/tick",
                     headers=_auth(keys["admin"]))
    assert r.status_code == 200
    body = r.json()
    assert body["n_ran"] == 1
    assert body["n_ok"] == 1
    assert body["n_skipped"] == 0
    assert body["n_errors"] == 0
    [res] = body["results"]
    assert res["status"] == "ok"
    assert res["run_id"].startswith("run_")


def test_tick_no_due_schedules_returns_empty(tmp_path):
    """Empty platform β†’ all counts zero, results []. Same as the CLI
    when nothing is due."""
    dbfile = str(tmp_path / "empty.sqlite3")
    svc = OrgStateService(dbfile)
    try:
        admin = svc.create_admin_key().raw
    finally:
        svc.close()
    client = _client(dbfile)
    r = client.post("/admin/schedules/tick",
                     headers=_auth(admin))
    assert r.status_code == 200
    body = r.json()
    assert body == {"n_ran": 0, "n_ok": 0, "n_skipped": 0,
                     "n_errors": 0, "results": []}


def test_tick_skipped_when_no_calibration(tmp_path):
    """Schedule on uncalibrated tenant β†’ status='skipped' with the
    "no calibration" reason, NOT 'error'. Same semantics as CLI tick
    (Stage 36): onboarding state, don't page."""
    dbfile, keys = _bootstrap(tmp_path, calibrated=False)
    client = _client(dbfile)
    r = client.post("/admin/schedules/tick",
                     headers=_auth(keys["admin"]))
    assert r.status_code == 200
    body = r.json()
    assert body["n_skipped"] == 1
    assert body["n_errors"] == 0
    assert "calibration" in body["results"][0]["reason"].lower()


def test_tick_error_isolated_to_one_schedule(tmp_path):
    """A schedule with bad data_dir errors at fetch time. The whole
    tick must NOT crash β€” status='error' on that one, others
    continue. The CLI's Stage 36 crash isolation guarantee, now
    over HTTP."""
    dbfile, keys = _bootstrap(tmp_path, calibrated=True,
                                bad_config=True)
    client = _client(dbfile)
    r = client.post("/admin/schedules/tick",
                     headers=_auth(keys["admin"]))
    # endpoint itself returns 200 even when schedules error β€”
    # tick reports per-schedule outcomes, doesn't propagate
    assert r.status_code == 200
    body = r.json()
    assert body["n_errors"] == 1
    [res] = body["results"]
    assert res["status"] == "error"
    assert res["reason"]


# --- auth ------------------------------------------------------------

def test_tick_no_admin_key_401(tmp_path):
    """Once admin auth is configured, anonymous β†’ 401."""
    dbfile, _ = _bootstrap(tmp_path, calibrated=True)
    client = _client(dbfile)
    r = client.post("/admin/schedules/tick")
    assert r.status_code == 401


def test_tick_tenant_key_forbidden(tmp_path):
    """Tenant key, even role=admin, must NOT be able to tick. tick
    runs schedules across every tenant β€” privilege the customer
    must never have."""
    dbfile, keys = _bootstrap(tmp_path, calibrated=True)
    client = _client(dbfile)
    r = client.post("/admin/schedules/tick",
                     headers=_auth(keys["acme_admin"]))
    assert r.status_code in (401, 403)


# --- payload parity with CLI ----------------------------------------

def test_http_payload_keys_match_cli_payload_keys(tmp_path):
    """The HTTP body must have exactly the same top-level keys the
    CLI prints β€” dashboards consuming one can swap to the other
    without changing their parser."""
    dbfile, keys = _bootstrap(tmp_path, calibrated=True)
    client = _client(dbfile)
    body = client.post("/admin/schedules/tick",
                        headers=_auth(keys["admin"])).json()
    assert set(body.keys()) == {
        "n_ran", "n_ok", "n_skipped", "n_errors", "results",
    }