File size: 1,901 Bytes
b641d3d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | from fastapi.testclient import TestClient
from server import api
from server.constants import TaskName
from server.models import Observation, SystemMetrics
class _FakeEnv:
def __init__(self) -> None:
self.reset_calls: list[TaskName] = []
def start(self) -> None:
return None
def close(self) -> None:
return None
def reset(self, task_name: TaskName) -> Observation:
self.reset_calls.append(task_name)
return Observation(
command_output="ready",
metrics=SystemMetrics(
gateway_success_rate=0.0,
gateway_p99_latency_ms=0.0,
queue_depth=0,
worker_restart_count=0,
consumer_stall_count=0,
),
process_status={"gateway": "running"},
)
def test_reset_defaults_to_cascading_timeout_when_task_missing(monkeypatch) -> None:
holder: dict[str, _FakeEnv] = {}
def fake_env_factory() -> _FakeEnv:
env = _FakeEnv()
holder["env"] = env
return env
monkeypatch.setattr(api, "DistributedDebugEnv", fake_env_factory)
with TestClient(api.app) as client:
response = client.post("/reset", json={})
assert response.status_code == 200
assert holder["env"].reset_calls == [TaskName.CASCADING_TIMEOUT]
def test_reset_rejects_unknown_explicit_task(monkeypatch) -> None:
holder: dict[str, _FakeEnv] = {}
def fake_env_factory() -> _FakeEnv:
env = _FakeEnv()
holder["env"] = env
return env
monkeypatch.setattr(api, "DistributedDebugEnv", fake_env_factory)
with TestClient(api.app) as client:
response = client.post("/reset", params={"task_name": "not-a-task"}, json={})
assert response.status_code == 400
assert response.json()["detail"] == "Unknown task: not-a-task"
assert holder["env"].reset_calls == []
|