Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| import time | |
| from http.client import HTTPConnection | |
| from http.server import ThreadingHTTPServer | |
| from threading import Event, Lock, Thread | |
| from typing import Any | |
| from urllib.error import HTTPError | |
| from world_simulator.api.server import ( | |
| _build_handler, | |
| _modal_health_urls, | |
| _ModalHealthTarget, | |
| _ModalHealthWarmer, | |
| build_handler, | |
| ) | |
| from world_simulator.api.runtime import GameRuntime | |
| from world_simulator.config import ( | |
| ConnectorConfig, | |
| GameConfig, | |
| NpcConfig, | |
| ServerConfig, | |
| SimulationConfig, | |
| WorldConfig, | |
| ) | |
| from world_simulator.domain import WorldState | |
| from world_simulator.simulation.connectors.base import TickPlan | |
| from world_simulator.simulation.spawning import create_world | |
| def test_snapshot_returns_previous_world_while_tick_is_planning() -> None: | |
| world = create_world(_config(npc_count=2)) | |
| simulator = _SlowSimulator() | |
| server = ThreadingHTTPServer(("127.0.0.1", 0), _build_handler(world, simulator)) | |
| port = int(server.server_address[1]) | |
| server_thread = Thread(target=server.serve_forever, daemon=True) | |
| tick_result: dict[str, Any] = {} | |
| tick_errors: list[Exception] = [] | |
| def post_tick() -> None: | |
| try: | |
| status, payload = _request_json(port, "POST", "/tick", timeout=6) | |
| tick_result["status"] = status | |
| tick_result["payload"] = payload | |
| except Exception as exc: | |
| tick_errors.append(exc) | |
| server_thread.start() | |
| tick_thread = Thread(target=post_tick, daemon=True) | |
| try: | |
| tick_thread.start() | |
| assert simulator.started.wait(timeout=1) | |
| status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) | |
| assert status == 200 | |
| assert snapshot["tick"] == 0 | |
| assert snapshot["simulation"]["tick_in_progress"] is True | |
| assert snapshot["simulation"]["pending_tick"] == 1 | |
| simulator.release.set() | |
| tick_thread.join(timeout=2) | |
| assert not tick_thread.is_alive() | |
| assert tick_errors == [] | |
| assert tick_result["status"] == 200 | |
| status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) | |
| assert status == 200 | |
| assert snapshot["tick"] == 1 | |
| assert snapshot["simulation"]["tick_in_progress"] is False | |
| assert snapshot["simulation"]["pending_tick"] is None | |
| finally: | |
| simulator.release.set() | |
| server.shutdown() | |
| server.server_close() | |
| server_thread.join(timeout=1) | |
| def test_snapshot_triggers_modal_health_warmup() -> None: | |
| world = create_world(_config(npc_count=2)) | |
| warmer = _FakeHealthWarmer() | |
| server = ThreadingHTTPServer( | |
| ("127.0.0.1", 0), | |
| _build_handler(world, _SlowSimulator(), modal_health_warmer=warmer), | |
| ) | |
| port = int(server.server_address[1]) | |
| server_thread = Thread(target=server.serve_forever, daemon=True) | |
| server_thread.start() | |
| try: | |
| status, snapshot = _request_json(port, "GET", "/scene/state?warmup=1", timeout=1) | |
| assert status == 200 | |
| assert snapshot["tick"] == 0 | |
| assert snapshot["simulation"]["models"] == [ | |
| { | |
| "id": "npc_model", | |
| "label": "NPC model", | |
| "model": "test-model", | |
| "status": "ready", | |
| "http_status": 200, | |
| } | |
| ] | |
| assert warmer.trigger_count == 1 | |
| status, _ = _request_json(port, "GET", "/health", timeout=1) | |
| assert status == 200 | |
| assert warmer.trigger_count == 1 | |
| finally: | |
| server.shutdown() | |
| server.server_close() | |
| server_thread.join(timeout=1) | |
| def test_snapshot_without_warmup_query_only_reads_model_status() -> None: | |
| world = create_world(_config(npc_count=2)) | |
| warmer = _FakeHealthWarmer() | |
| server = ThreadingHTTPServer( | |
| ("127.0.0.1", 0), | |
| _build_handler(world, _SlowSimulator(), modal_health_warmer=warmer), | |
| ) | |
| port = int(server.server_address[1]) | |
| server_thread = Thread(target=server.serve_forever, daemon=True) | |
| server_thread.start() | |
| try: | |
| status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1) | |
| assert status == 200 | |
| assert snapshot["simulation"]["models"][0]["status"] == "ready" | |
| assert warmer.trigger_count == 0 | |
| finally: | |
| server.shutdown() | |
| server.server_close() | |
| server_thread.join(timeout=1) | |
| def test_modal_health_urls_use_only_modal_openai_connectors() -> None: | |
| config = _config( | |
| npc_count=2, | |
| connector=ConnectorConfig( | |
| type="openai_compatible", | |
| base_url="https://workspace--npc-serve.modal.run/v1", | |
| model="npc-model", | |
| ), | |
| secondary_connectors={ | |
| "qwen": ConnectorConfig( | |
| type="openai_compatible", | |
| base_url="https://workspace--qwen-serve.modal.run/v1/chat/completions", | |
| model="qwen-model", | |
| ) | |
| }, | |
| ) | |
| assert _modal_health_urls(config) == [ | |
| "https://workspace--npc-serve.modal.run/health", | |
| "https://workspace--qwen-serve.modal.run/health", | |
| ] | |
| config = _config( | |
| npc_count=2, | |
| connector=ConnectorConfig( | |
| type="openai_compatible", | |
| base_url="https://workspace--npc-serve.modal.run/v1", | |
| model="npc-model", | |
| ), | |
| secondary_connectors={ | |
| "external": ConnectorConfig( | |
| type="openai_compatible", | |
| base_url="https://api.openai.com/v1", | |
| model="external-model", | |
| ) | |
| }, | |
| ) | |
| assert _modal_health_urls(config) == [ | |
| "https://workspace--npc-serve.modal.run/health" | |
| ] | |
| def test_http_server_admin_can_switch_npc_model(monkeypatch: pytest.MonkeyPatch) -> None: | |
| monkeypatch.setenv("ADMIN_TOKEN", "test-admin") | |
| config = _config( | |
| npc_count=2, | |
| connector=ConnectorConfig( | |
| type="openai_compatible", | |
| base_url="https://workspace--npc-serve.modal.run/v1", | |
| model="npc-model", | |
| ), | |
| secondary_connectors={ | |
| "qwen": ConnectorConfig( | |
| type="openai_compatible", | |
| base_url="https://workspace--qwen-serve.modal.run/v1", | |
| model="qwen-model", | |
| ) | |
| }, | |
| ) | |
| world = create_world(config) | |
| runtime = GameRuntime(world=world, simulator=_SlowSimulator(), config=config) | |
| server = ThreadingHTTPServer(("127.0.0.1", 0), build_handler(runtime)) | |
| port = int(server.server_address[1]) | |
| server_thread = Thread(target=server.serve_forever, daemon=True) | |
| server_thread.start() | |
| try: | |
| status, payload = _request_json( | |
| port, | |
| "GET", | |
| "/admin/models", | |
| timeout=1, | |
| headers={"X-Admin-Token": "test-admin"}, | |
| ) | |
| assert status == 200 | |
| assert {profile["id"] for profile in payload["profiles"]} >= {"default", "qwen"} | |
| qwen_profile = next(profile for profile in payload["profiles"] if profile["id"] == "qwen") | |
| assert qwen_profile["model"] == "qwen-model" | |
| status, payload = _request_json( | |
| port, | |
| "POST", | |
| "/admin/npcs/npc-001/model", | |
| timeout=1, | |
| headers={"X-Admin-Token": "test-admin"}, | |
| body={"profile_id": "qwen"}, | |
| ) | |
| assert status == 200 | |
| assert payload["profile"]["id"] == "qwen" | |
| assert world.npcs[0].model_profile_id == "qwen" | |
| assert world.npcs[0].connector_id == "qwen" | |
| finally: | |
| server.shutdown() | |
| server.server_close() | |
| server_thread.join(timeout=1) | |
| def test_forced_modal_warmup_does_not_duplicate_inflight_checks() -> None: | |
| opener = _BlockingOpener() | |
| warmer = _ModalHealthWarmer( | |
| [ | |
| _ModalHealthTarget( | |
| id="npc_model", | |
| label="NPC model", | |
| model="test-model", | |
| health_url="https://workspace--npc-serve.modal.run/health", | |
| ) | |
| ], | |
| opener=opener, | |
| ) | |
| warmer.trigger(force=True) | |
| assert opener.started.wait(timeout=1) | |
| warmer.trigger(force=True) | |
| statuses = warmer.statuses() | |
| try: | |
| assert opener.call_count == 1 | |
| assert statuses == [ | |
| { | |
| "id": "npc_model", | |
| "label": "NPC model", | |
| "model": "test-model", | |
| "status": "checking", | |
| "http_status": None, | |
| } | |
| ] | |
| finally: | |
| opener.release.set() | |
| def test_modal_health_warmer_keeps_503_loading_as_warmup_until_ready() -> None: | |
| opener = _LoadingThenReadyOpener() | |
| warmer = _ModalHealthWarmer( | |
| [ | |
| _ModalHealthTarget( | |
| id="npc_model", | |
| label="NPC model", | |
| model="test-model", | |
| health_url="https://workspace--npc-serve.modal.run/health", | |
| ) | |
| ], | |
| warmup_retry_seconds=0.05, | |
| opener=opener, | |
| ) | |
| warmer.trigger(force=True) | |
| assert opener.loading_returned.wait(timeout=1) | |
| _wait_for_status(warmer, status="warmup", http_status=503) | |
| opener.allow_ready.set() | |
| assert opener.ready_returned.wait(timeout=1) | |
| _wait_for_status(warmer, status="ready", http_status=200) | |
| class _SlowSimulator: | |
| name = "slow" | |
| def __init__(self) -> None: | |
| self.started = Event() | |
| self.release = Event() | |
| def propose_tick(self, _world: WorldState, _next_tick: int) -> TickPlan: | |
| self.started.set() | |
| self.release.wait(timeout=5) | |
| return TickPlan(source=self.name, directives=[]) | |
| class _FakeHealthWarmer: | |
| def __init__(self) -> None: | |
| self.trigger_count = 0 | |
| def trigger(self, *, force: bool = False) -> None: | |
| self.trigger_count += 1 | |
| def statuses(self) -> list[dict[str, Any]]: | |
| return [ | |
| { | |
| "id": "npc_model", | |
| "label": "NPC model", | |
| "model": "test-model", | |
| "status": "ready", | |
| "http_status": 200, | |
| } | |
| ] | |
| class _BlockingOpener: | |
| def __init__(self) -> None: | |
| self.started = Event() | |
| self.release = Event() | |
| self._lock = Lock() | |
| self.call_count = 0 | |
| def __call__(self, url: str) -> Any: | |
| _ = url | |
| with self._lock: | |
| self.call_count += 1 | |
| self.started.set() | |
| self.release.wait(timeout=5) | |
| return _FakeResponse() | |
| class _LoadingThenReadyOpener: | |
| def __init__(self) -> None: | |
| self.loading_returned = Event() | |
| self.allow_ready = Event() | |
| self.ready_returned = Event() | |
| self._lock = Lock() | |
| self.call_count = 0 | |
| def __call__(self, url: str) -> Any: | |
| with self._lock: | |
| self.call_count += 1 | |
| call_count = self.call_count | |
| if call_count == 1: | |
| self.loading_returned.set() | |
| raise HTTPError(url, 503, "Service Unavailable", hdrs=None, fp=None) | |
| self.allow_ready.wait(timeout=5) | |
| self.ready_returned.set() | |
| return _FakeResponse() | |
| class _FakeResponse: | |
| status: int = 200 | |
| def close(self) -> None: | |
| return | |
| def _request_json( | |
| port: int, | |
| method: str, | |
| path: str, | |
| *, | |
| timeout: float, | |
| headers: dict[str, str] | None = None, | |
| body: dict[str, Any] | None = None, | |
| ) -> tuple[int, dict[str, Any]]: | |
| connection = HTTPConnection("127.0.0.1", port, timeout=timeout) | |
| try: | |
| request_body = json.dumps(body).encode("utf-8") if body is not None else None | |
| request_headers = dict(headers or {}) | |
| if request_body is not None: | |
| request_headers.setdefault("Content-Type", "application/json") | |
| connection.request(method, path, body=request_body, headers=request_headers) | |
| response = connection.getresponse() | |
| body = response.read().decode("utf-8") | |
| finally: | |
| connection.close() | |
| payload = json.loads(body) | |
| assert isinstance(payload, dict) | |
| return response.status, payload | |
| def _wait_for_status( | |
| warmer: _ModalHealthWarmer, | |
| *, | |
| status: str, | |
| http_status: int, | |
| ) -> None: | |
| deadline = time.monotonic() + 1 | |
| while time.monotonic() < deadline: | |
| statuses = warmer.statuses() | |
| if ( | |
| statuses[0]["status"] == status | |
| and statuses[0]["http_status"] == http_status | |
| ): | |
| return | |
| time.sleep(0.01) | |
| raise AssertionError(f"Expected {status=} and {http_status=}, got {warmer.statuses()!r}") | |
| def _config( | |
| *, | |
| npc_count: int, | |
| connector: ConnectorConfig | None = None, | |
| god_console: ConnectorConfig | None = None, | |
| secondary_connectors: dict[str, ConnectorConfig] | None = None, | |
| ) -> GameConfig: | |
| return GameConfig( | |
| world=WorldConfig(width=80, depth=80, terrain="plain_green", seed=42), | |
| npcs=NpcConfig(count=npc_count), | |
| simulation=SimulationConfig(tick_ms=500), | |
| server=ServerConfig(host="127.0.0.1", port=8000), | |
| connector=connector or ConnectorConfig(type="deterministic"), | |
| god_console=god_console, | |
| secondary_connectors=secondary_connectors or {}, | |
| ) | |