| from __future__ import annotations |
|
|
| import json |
| import time |
| from http.client import HTTPConnection |
| from http.server import ThreadingHTTPServer |
| from threading import Event, Lock, Thread |
| from typing import Any |
| from urllib.error import HTTPError |
|
|
| from world_simulator.api.server import ( |
| _build_handler, |
| _modal_health_urls, |
| _ModalHealthTarget, |
| _ModalHealthWarmer, |
| build_handler, |
| ) |
| from world_simulator.api.runtime import GameRuntime |
| from world_simulator.config import ( |
| ConnectorConfig, |
| GameConfig, |
| NpcConfig, |
| ServerConfig, |
| SimulationConfig, |
| WorldConfig, |
| ) |
| from world_simulator.domain import WorldState |
| from world_simulator.simulation.connectors.base import TickPlan |
| from world_simulator.simulation.spawning import create_world |
|
|
|
|
| def test_snapshot_returns_previous_world_while_tick_is_planning() -> None: |
| world = create_world(_config(npc_count=2)) |
| simulator = _SlowSimulator() |
| server = ThreadingHTTPServer(("127.0.0.1", 0), _build_handler(world, simulator)) |
| port = int(server.server_address[1]) |
| server_thread = Thread(target=server.serve_forever, daemon=True) |
| tick_result: dict[str, Any] = {} |
| tick_errors: list[Exception] = [] |
|
|
| def post_tick() -> None: |
| try: |
| status, payload = _request_json(port, "POST", "/tick", timeout=6) |
| tick_result["status"] = status |
| tick_result["payload"] = payload |
| except Exception as exc: |
| tick_errors.append(exc) |
|
|
| server_thread.start() |
| tick_thread = Thread(target=post_tick, daemon=True) |
|
|
| try: |
| tick_thread.start() |
| assert simulator.started.wait(timeout=1) |
|
|
| status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) |
|
|
| assert status == 200 |
| assert snapshot["tick"] == 0 |
| assert snapshot["simulation"]["tick_in_progress"] is True |
| assert snapshot["simulation"]["pending_tick"] == 1 |
|
|
| simulator.release.set() |
| tick_thread.join(timeout=2) |
|
|
| assert not tick_thread.is_alive() |
| assert tick_errors == [] |
| assert tick_result["status"] == 200 |
|
|
| status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) |
|
|
| assert status == 200 |
| assert snapshot["tick"] == 1 |
| assert snapshot["simulation"]["tick_in_progress"] is False |
| assert snapshot["simulation"]["pending_tick"] is None |
| finally: |
| simulator.release.set() |
| server.shutdown() |
| server.server_close() |
| server_thread.join(timeout=1) |
|
|
|
|
| def test_snapshot_triggers_modal_health_warmup() -> None: |
| world = create_world(_config(npc_count=2)) |
| warmer = _FakeHealthWarmer() |
| server = ThreadingHTTPServer( |
| ("127.0.0.1", 0), |
| _build_handler(world, _SlowSimulator(), modal_health_warmer=warmer), |
| ) |
| port = int(server.server_address[1]) |
| server_thread = Thread(target=server.serve_forever, daemon=True) |
|
|
| server_thread.start() |
| try: |
| status, snapshot = _request_json(port, "GET", "/scene/state?warmup=1", timeout=1) |
|
|
| assert status == 200 |
| assert snapshot["tick"] == 0 |
| assert snapshot["simulation"]["models"] == [ |
| { |
| "id": "npc_model", |
| "label": "NPC model", |
| "model": "test-model", |
| "status": "ready", |
| "http_status": 200, |
| } |
| ] |
| assert warmer.trigger_count == 1 |
|
|
| status, _ = _request_json(port, "GET", "/health", timeout=1) |
|
|
| assert status == 200 |
| assert warmer.trigger_count == 1 |
| finally: |
| server.shutdown() |
| server.server_close() |
| server_thread.join(timeout=1) |
|
|
|
|
| def test_snapshot_without_warmup_query_only_reads_model_status() -> None: |
| world = create_world(_config(npc_count=2)) |
| warmer = _FakeHealthWarmer() |
| server = ThreadingHTTPServer( |
| ("127.0.0.1", 0), |
| _build_handler(world, _SlowSimulator(), modal_health_warmer=warmer), |
| ) |
| port = int(server.server_address[1]) |
| server_thread = Thread(target=server.serve_forever, daemon=True) |
|
|
| server_thread.start() |
| try: |
| status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) |
|
|
| assert status == 200 |
| assert snapshot["simulation"]["models"][0]["status"] == "ready" |
| assert warmer.trigger_count == 0 |
| finally: |
| server.shutdown() |
| server.server_close() |
| server_thread.join(timeout=1) |
|
|
|
|
| def test_modal_health_urls_use_only_modal_openai_connectors() -> None: |
| config = _config( |
| npc_count=2, |
| connector=ConnectorConfig( |
| type="openai_compatible", |
| base_url="https://workspace--npc-serve.modal.run/v1", |
| model="npc-model", |
| ), |
| secondary_connectors={ |
| "qwen": ConnectorConfig( |
| type="openai_compatible", |
| base_url="https://workspace--qwen-serve.modal.run/v1/chat/completions", |
| model="qwen-model", |
| ) |
| }, |
| ) |
|
|
| assert _modal_health_urls(config) == [ |
| "https://workspace--npc-serve.modal.run/health", |
| "https://workspace--qwen-serve.modal.run/health", |
| ] |
|
|
| config = _config( |
| npc_count=2, |
| connector=ConnectorConfig( |
| type="openai_compatible", |
| base_url="https://workspace--npc-serve.modal.run/v1", |
| model="npc-model", |
| ), |
| secondary_connectors={ |
| "external": ConnectorConfig( |
| type="openai_compatible", |
| base_url="https://api.openai.com/v1", |
| model="external-model", |
| ) |
| }, |
| ) |
|
|
| assert _modal_health_urls(config) == [ |
| "https://workspace--npc-serve.modal.run/health" |
| ] |
|
|
|
|
| def test_http_server_admin_can_switch_npc_model(monkeypatch: pytest.MonkeyPatch) -> None: |
| monkeypatch.setenv("ADMIN_TOKEN", "test-admin") |
| config = _config( |
| npc_count=2, |
| connector=ConnectorConfig( |
| type="openai_compatible", |
| base_url="https://workspace--npc-serve.modal.run/v1", |
| model="npc-model", |
| ), |
| secondary_connectors={ |
| "qwen": ConnectorConfig( |
| type="openai_compatible", |
| base_url="https://workspace--qwen-serve.modal.run/v1", |
| model="qwen-model", |
| ) |
| }, |
| ) |
| world = create_world(config) |
| runtime = GameRuntime(world=world, simulator=_SlowSimulator(), config=config) |
| server = ThreadingHTTPServer(("127.0.0.1", 0), build_handler(runtime)) |
| port = int(server.server_address[1]) |
| server_thread = Thread(target=server.serve_forever, daemon=True) |
|
|
| server_thread.start() |
| try: |
| status, payload = _request_json( |
| port, |
| "GET", |
| "/admin/models", |
| timeout=1, |
| headers={"X-Admin-Token": "test-admin"}, |
| ) |
| assert status == 200 |
| assert {profile["id"] for profile in payload["profiles"]} >= {"default", "qwen"} |
| qwen_profile = next(profile for profile in payload["profiles"] if profile["id"] == "qwen") |
| assert qwen_profile["model"] == "qwen-model" |
|
|
| status, payload = _request_json( |
| port, |
| "POST", |
| "/admin/npcs/npc-001/model", |
| timeout=1, |
| headers={"X-Admin-Token": "test-admin"}, |
| body={"profile_id": "qwen"}, |
| ) |
|
|
| assert status == 200 |
| assert payload["profile"]["id"] == "qwen" |
| assert world.npcs[0].model_profile_id == "qwen" |
| assert world.npcs[0].connector_id == "qwen" |
| finally: |
| server.shutdown() |
| server.server_close() |
| server_thread.join(timeout=1) |
|
|
|
|
| def test_forced_modal_warmup_does_not_duplicate_inflight_checks() -> None: |
| opener = _BlockingOpener() |
| warmer = _ModalHealthWarmer( |
| [ |
| _ModalHealthTarget( |
| id="npc_model", |
| label="NPC model", |
| model="test-model", |
| health_url="https://workspace--npc-serve.modal.run/health", |
| ) |
| ], |
| opener=opener, |
| ) |
|
|
| warmer.trigger(force=True) |
| assert opener.started.wait(timeout=1) |
| warmer.trigger(force=True) |
|
|
| statuses = warmer.statuses() |
| try: |
| assert opener.call_count == 1 |
| assert statuses == [ |
| { |
| "id": "npc_model", |
| "label": "NPC model", |
| "model": "test-model", |
| "status": "checking", |
| "http_status": None, |
| } |
| ] |
| finally: |
| opener.release.set() |
|
|
|
|
| def test_modal_health_warmer_keeps_503_loading_as_warmup_until_ready() -> None: |
| opener = _LoadingThenReadyOpener() |
| warmer = _ModalHealthWarmer( |
| [ |
| _ModalHealthTarget( |
| id="npc_model", |
| label="NPC model", |
| model="test-model", |
| health_url="https://workspace--npc-serve.modal.run/health", |
| ) |
| ], |
| warmup_retry_seconds=0.05, |
| opener=opener, |
| ) |
|
|
| warmer.trigger(force=True) |
| assert opener.loading_returned.wait(timeout=1) |
| _wait_for_status(warmer, status="warmup", http_status=503) |
|
|
| opener.allow_ready.set() |
| assert opener.ready_returned.wait(timeout=1) |
| _wait_for_status(warmer, status="ready", http_status=200) |
|
|
|
|
| class _SlowSimulator: |
| name = "slow" |
|
|
| def __init__(self) -> None: |
| self.started = Event() |
| self.release = Event() |
|
|
| def propose_tick(self, _world: WorldState, _next_tick: int) -> TickPlan: |
| self.started.set() |
| self.release.wait(timeout=5) |
| return TickPlan(source=self.name, directives=[]) |
|
|
|
|
| class _FakeHealthWarmer: |
| def __init__(self) -> None: |
| self.trigger_count = 0 |
|
|
| def trigger(self, *, force: bool = False) -> None: |
| self.trigger_count += 1 |
|
|
| def statuses(self) -> list[dict[str, Any]]: |
| return [ |
| { |
| "id": "npc_model", |
| "label": "NPC model", |
| "model": "test-model", |
| "status": "ready", |
| "http_status": 200, |
| } |
| ] |
|
|
|
|
| class _BlockingOpener: |
| def __init__(self) -> None: |
| self.started = Event() |
| self.release = Event() |
| self._lock = Lock() |
| self.call_count = 0 |
|
|
| def __call__(self, url: str) -> Any: |
| _ = url |
| with self._lock: |
| self.call_count += 1 |
| self.started.set() |
| self.release.wait(timeout=5) |
| return _FakeResponse() |
|
|
|
|
| class _LoadingThenReadyOpener: |
| def __init__(self) -> None: |
| self.loading_returned = Event() |
| self.allow_ready = Event() |
| self.ready_returned = Event() |
| self._lock = Lock() |
| self.call_count = 0 |
|
|
| def __call__(self, url: str) -> Any: |
| with self._lock: |
| self.call_count += 1 |
| call_count = self.call_count |
|
|
| if call_count == 1: |
| self.loading_returned.set() |
| raise HTTPError(url, 503, "Service Unavailable", hdrs=None, fp=None) |
|
|
| self.allow_ready.wait(timeout=5) |
| self.ready_returned.set() |
| return _FakeResponse() |
|
|
|
|
| class _FakeResponse: |
| status: int = 200 |
|
|
| def close(self) -> None: |
| return |
|
|
|
|
| def _request_json( |
| port: int, |
| method: str, |
| path: str, |
| *, |
| timeout: float, |
| headers: dict[str, str] | None = None, |
| body: dict[str, Any] | None = None, |
| ) -> tuple[int, dict[str, Any]]: |
| connection = HTTPConnection("127.0.0.1", port, timeout=timeout) |
| try: |
| request_body = json.dumps(body).encode("utf-8") if body is not None else None |
| request_headers = dict(headers or {}) |
| if request_body is not None: |
| request_headers.setdefault("Content-Type", "application/json") |
| connection.request(method, path, body=request_body, headers=request_headers) |
| response = connection.getresponse() |
| body = response.read().decode("utf-8") |
| finally: |
| connection.close() |
|
|
| payload = json.loads(body) |
| assert isinstance(payload, dict) |
| return response.status, payload |
|
|
|
|
| def _wait_for_status( |
| warmer: _ModalHealthWarmer, |
| *, |
| status: str, |
| http_status: int, |
| ) -> None: |
| deadline = time.monotonic() + 1 |
| while time.monotonic() < deadline: |
| statuses = warmer.statuses() |
| if ( |
| statuses[0]["status"] == status |
| and statuses[0]["http_status"] == http_status |
| ): |
| return |
| time.sleep(0.01) |
| raise AssertionError(f"Expected {status=} and {http_status=}, got {warmer.statuses()!r}") |
|
|
|
|
| def _config( |
| *, |
| npc_count: int, |
| connector: ConnectorConfig | None = None, |
| god_console: ConnectorConfig | None = None, |
| secondary_connectors: dict[str, ConnectorConfig] | None = None, |
| ) -> GameConfig: |
| return GameConfig( |
| world=WorldConfig(width=80, depth=80, terrain="plain_green", seed=42), |
| npcs=NpcConfig(count=npc_count), |
| simulation=SimulationConfig(tick_ms=500), |
| server=ServerConfig(host="127.0.0.1", port=8000), |
| connector=connector or ConnectorConfig(type="deterministic"), |
| god_console=god_console, |
| secondary_connectors=secondary_connectors or {}, |
| ) |
|
|