from __future__ import annotations import json import time from http.client import HTTPConnection from http.server import ThreadingHTTPServer from threading import Event, Lock, Thread from typing import Any from urllib.error import HTTPError from world_simulator.api.server import ( _build_handler, _modal_health_urls, _ModalHealthTarget, _ModalHealthWarmer, build_handler, ) from world_simulator.api.runtime import GameRuntime from world_simulator.config import ( ConnectorConfig, GameConfig, NpcConfig, ServerConfig, SimulationConfig, WorldConfig, ) from world_simulator.domain import WorldState from world_simulator.simulation.connectors.base import TickPlan from world_simulator.simulation.spawning import create_world def test_snapshot_returns_previous_world_while_tick_is_planning() -> None: world = create_world(_config(npc_count=2)) simulator = _SlowSimulator() server = ThreadingHTTPServer(("127.0.0.1", 0), _build_handler(world, simulator)) port = int(server.server_address[1]) server_thread = Thread(target=server.serve_forever, daemon=True) tick_result: dict[str, Any] = {} tick_errors: list[Exception] = [] def post_tick() -> None: try: status, payload = _request_json(port, "POST", "/tick", timeout=6) tick_result["status"] = status tick_result["payload"] = payload except Exception as exc: tick_errors.append(exc) server_thread.start() tick_thread = Thread(target=post_tick, daemon=True) try: tick_thread.start() assert simulator.started.wait(timeout=1) status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) assert status == 200 assert snapshot["tick"] == 0 assert snapshot["simulation"]["tick_in_progress"] is True assert snapshot["simulation"]["pending_tick"] == 1 simulator.release.set() tick_thread.join(timeout=2) assert not tick_thread.is_alive() assert tick_errors == [] assert tick_result["status"] == 200 status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) assert status == 200 assert snapshot["tick"] == 1 assert snapshot["simulation"]["tick_in_progress"] is False assert snapshot["simulation"]["pending_tick"] is None finally: simulator.release.set() server.shutdown() server.server_close() server_thread.join(timeout=1) def test_snapshot_triggers_modal_health_warmup() -> None: world = create_world(_config(npc_count=2)) warmer = _FakeHealthWarmer() server = ThreadingHTTPServer( ("127.0.0.1", 0), _build_handler(world, _SlowSimulator(), modal_health_warmer=warmer), ) port = int(server.server_address[1]) server_thread = Thread(target=server.serve_forever, daemon=True) server_thread.start() try: status, snapshot = _request_json(port, "GET", "/scene/state?warmup=1", timeout=1) assert status == 200 assert snapshot["tick"] == 0 assert snapshot["simulation"]["models"] == [ { "id": "npc_model", "label": "NPC model", "model": "test-model", "status": "ready", "http_status": 200, } ] assert warmer.trigger_count == 1 status, _ = _request_json(port, "GET", "/health", timeout=1) assert status == 200 assert warmer.trigger_count == 1 finally: server.shutdown() server.server_close() server_thread.join(timeout=1) def test_snapshot_without_warmup_query_only_reads_model_status() -> None: world = create_world(_config(npc_count=2)) warmer = _FakeHealthWarmer() server = ThreadingHTTPServer( ("127.0.0.1", 0), _build_handler(world, _SlowSimulator(), modal_health_warmer=warmer), ) port = int(server.server_address[1]) server_thread = Thread(target=server.serve_forever, daemon=True) server_thread.start() try: status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) assert status == 200 assert snapshot["simulation"]["models"][0]["status"] == "ready" assert warmer.trigger_count == 0 finally: server.shutdown() server.server_close() server_thread.join(timeout=1) def test_modal_health_urls_use_only_modal_openai_connectors() -> None: config = _config( npc_count=2, connector=ConnectorConfig( type="openai_compatible", base_url="https://workspace--npc-serve.modal.run/v1", model="npc-model", ), secondary_connectors={ "qwen": ConnectorConfig( type="openai_compatible", base_url="https://workspace--qwen-serve.modal.run/v1/chat/completions", model="qwen-model", ) }, ) assert _modal_health_urls(config) == [ "https://workspace--npc-serve.modal.run/health", "https://workspace--qwen-serve.modal.run/health", ] config = _config( npc_count=2, connector=ConnectorConfig( type="openai_compatible", base_url="https://workspace--npc-serve.modal.run/v1", model="npc-model", ), secondary_connectors={ "external": ConnectorConfig( type="openai_compatible", base_url="https://api.openai.com/v1", model="external-model", ) }, ) assert _modal_health_urls(config) == [ "https://workspace--npc-serve.modal.run/health" ] def test_http_server_admin_can_switch_npc_model(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("ADMIN_TOKEN", "test-admin") config = _config( npc_count=2, connector=ConnectorConfig( type="openai_compatible", base_url="https://workspace--npc-serve.modal.run/v1", model="npc-model", ), secondary_connectors={ "qwen": ConnectorConfig( type="openai_compatible", base_url="https://workspace--qwen-serve.modal.run/v1", model="qwen-model", ) }, ) world = create_world(config) runtime = GameRuntime(world=world, simulator=_SlowSimulator(), config=config) server = ThreadingHTTPServer(("127.0.0.1", 0), build_handler(runtime)) port = int(server.server_address[1]) server_thread = Thread(target=server.serve_forever, daemon=True) server_thread.start() try: status, payload = _request_json( port, "GET", "/admin/models", timeout=1, headers={"X-Admin-Token": "test-admin"}, ) assert status == 200 assert {profile["id"] for profile in payload["profiles"]} >= {"default", "qwen"} qwen_profile = next(profile for profile in payload["profiles"] if profile["id"] == "qwen") assert qwen_profile["model"] == "qwen-model" status, payload = _request_json( port, "POST", "/admin/npcs/npc-001/model", timeout=1, headers={"X-Admin-Token": "test-admin"}, body={"profile_id": "qwen"}, ) assert status == 200 assert payload["profile"]["id"] == "qwen" assert world.npcs[0].model_profile_id == "qwen" assert world.npcs[0].connector_id == "qwen" finally: server.shutdown() server.server_close() server_thread.join(timeout=1) def test_forced_modal_warmup_does_not_duplicate_inflight_checks() -> None: opener = _BlockingOpener() warmer = _ModalHealthWarmer( [ _ModalHealthTarget( id="npc_model", label="NPC model", model="test-model", health_url="https://workspace--npc-serve.modal.run/health", ) ], opener=opener, ) warmer.trigger(force=True) assert opener.started.wait(timeout=1) warmer.trigger(force=True) statuses = warmer.statuses() try: assert opener.call_count == 1 assert statuses == [ { "id": "npc_model", "label": "NPC model", "model": "test-model", "status": "checking", "http_status": None, } ] finally: opener.release.set() def test_modal_health_warmer_keeps_503_loading_as_warmup_until_ready() -> None: opener = _LoadingThenReadyOpener() warmer = _ModalHealthWarmer( [ _ModalHealthTarget( id="npc_model", label="NPC model", model="test-model", health_url="https://workspace--npc-serve.modal.run/health", ) ], warmup_retry_seconds=0.05, opener=opener, ) warmer.trigger(force=True) assert opener.loading_returned.wait(timeout=1) _wait_for_status(warmer, status="warmup", http_status=503) opener.allow_ready.set() assert opener.ready_returned.wait(timeout=1) _wait_for_status(warmer, status="ready", http_status=200) class _SlowSimulator: name = "slow" def __init__(self) -> None: self.started = Event() self.release = Event() def propose_tick(self, _world: WorldState, _next_tick: int) -> TickPlan: self.started.set() self.release.wait(timeout=5) return TickPlan(source=self.name, directives=[]) class _FakeHealthWarmer: def __init__(self) -> None: self.trigger_count = 0 def trigger(self, *, force: bool = False) -> None: self.trigger_count += 1 def statuses(self) -> list[dict[str, Any]]: return [ { "id": "npc_model", "label": "NPC model", "model": "test-model", "status": "ready", "http_status": 200, } ] class _BlockingOpener: def __init__(self) -> None: self.started = Event() self.release = Event() self._lock = Lock() self.call_count = 0 def __call__(self, url: str) -> Any: _ = url with self._lock: self.call_count += 1 self.started.set() self.release.wait(timeout=5) return _FakeResponse() class _LoadingThenReadyOpener: def __init__(self) -> None: self.loading_returned = Event() self.allow_ready = Event() self.ready_returned = Event() self._lock = Lock() self.call_count = 0 def __call__(self, url: str) -> Any: with self._lock: self.call_count += 1 call_count = self.call_count if call_count == 1: self.loading_returned.set() raise HTTPError(url, 503, "Service Unavailable", hdrs=None, fp=None) self.allow_ready.wait(timeout=5) self.ready_returned.set() return _FakeResponse() class _FakeResponse: status: int = 200 def close(self) -> None: return def _request_json( port: int, method: str, path: str, *, timeout: float, headers: dict[str, str] | None = None, body: dict[str, Any] | None = None, ) -> tuple[int, dict[str, Any]]: connection = HTTPConnection("127.0.0.1", port, timeout=timeout) try: request_body = json.dumps(body).encode("utf-8") if body is not None else None request_headers = dict(headers or {}) if request_body is not None: request_headers.setdefault("Content-Type", "application/json") connection.request(method, path, body=request_body, headers=request_headers) response = connection.getresponse() body = response.read().decode("utf-8") finally: connection.close() payload = json.loads(body) assert isinstance(payload, dict) return response.status, payload def _wait_for_status( warmer: _ModalHealthWarmer, *, status: str, http_status: int, ) -> None: deadline = time.monotonic() + 1 while time.monotonic() < deadline: statuses = warmer.statuses() if ( statuses[0]["status"] == status and statuses[0]["http_status"] == http_status ): return time.sleep(0.01) raise AssertionError(f"Expected {status=} and {http_status=}, got {warmer.statuses()!r}") def _config( *, npc_count: int, connector: ConnectorConfig | None = None, god_console: ConnectorConfig | None = None, secondary_connectors: dict[str, ConnectorConfig] | None = None, ) -> GameConfig: return GameConfig( world=WorldConfig(width=80, depth=80, terrain="plain_green", seed=42), npcs=NpcConfig(count=npc_count), simulation=SimulationConfig(tick_ms=500), server=ServerConfig(host="127.0.0.1", port=8000), connector=connector or ConnectorConfig(type="deterministic"), god_console=god_console, secondary_connectors=secondary_connectors or {}, )