world-simulator / tests /test_api_server.py
kikikita's picture
Merge branch 'main' of https://github.com/DeltaZN/god_simulator
412ec7f
Raw
History Blame Contribute Delete
13.2 kB
from __future__ import annotations
import json
import time
from http.client import HTTPConnection
from http.server import ThreadingHTTPServer
from threading import Event, Lock, Thread
from typing import Any
from urllib.error import HTTPError
from world_simulator.api.server import (
_build_handler,
_modal_health_urls,
_ModalHealthTarget,
_ModalHealthWarmer,
build_handler,
)
from world_simulator.api.runtime import GameRuntime
from world_simulator.config import (
ConnectorConfig,
GameConfig,
NpcConfig,
ServerConfig,
SimulationConfig,
WorldConfig,
)
from world_simulator.domain import WorldState
from world_simulator.simulation.connectors.base import TickPlan
from world_simulator.simulation.spawning import create_world
def test_snapshot_returns_previous_world_while_tick_is_planning() -> None:
world = create_world(_config(npc_count=2))
simulator = _SlowSimulator()
server = ThreadingHTTPServer(("127.0.0.1", 0), _build_handler(world, simulator))
port = int(server.server_address[1])
server_thread = Thread(target=server.serve_forever, daemon=True)
tick_result: dict[str, Any] = {}
tick_errors: list[Exception] = []
def post_tick() -> None:
try:
status, payload = _request_json(port, "POST", "/tick", timeout=6)
tick_result["status"] = status
tick_result["payload"] = payload
except Exception as exc:
tick_errors.append(exc)
server_thread.start()
tick_thread = Thread(target=post_tick, daemon=True)
try:
tick_thread.start()
assert simulator.started.wait(timeout=1)
status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1)
assert status == 200
assert snapshot["tick"] == 0
assert snapshot["simulation"]["tick_in_progress"] is True
assert snapshot["simulation"]["pending_tick"] == 1
simulator.release.set()
tick_thread.join(timeout=2)
assert not tick_thread.is_alive()
assert tick_errors == []
assert tick_result["status"] == 200
status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1)
assert status == 200
assert snapshot["tick"] == 1
assert snapshot["simulation"]["tick_in_progress"] is False
assert snapshot["simulation"]["pending_tick"] is None
finally:
simulator.release.set()
server.shutdown()
server.server_close()
server_thread.join(timeout=1)
def test_snapshot_triggers_modal_health_warmup() -> None:
world = create_world(_config(npc_count=2))
warmer = _FakeHealthWarmer()
server = ThreadingHTTPServer(
("127.0.0.1", 0),
_build_handler(world, _SlowSimulator(), modal_health_warmer=warmer),
)
port = int(server.server_address[1])
server_thread = Thread(target=server.serve_forever, daemon=True)
server_thread.start()
try:
status, snapshot = _request_json(port, "GET", "/scene/state?warmup=1", timeout=1)
assert status == 200
assert snapshot["tick"] == 0
assert snapshot["simulation"]["models"] == [
{
"id": "npc_model",
"label": "NPC model",
"model": "test-model",
"status": "ready",
"http_status": 200,
}
]
assert warmer.trigger_count == 1
status, _ = _request_json(port, "GET", "/health", timeout=1)
assert status == 200
assert warmer.trigger_count == 1
finally:
server.shutdown()
server.server_close()
server_thread.join(timeout=1)
def test_snapshot_without_warmup_query_only_reads_model_status() -> None:
world = create_world(_config(npc_count=2))
warmer = _FakeHealthWarmer()
server = ThreadingHTTPServer(
("127.0.0.1", 0),
_build_handler(world, _SlowSimulator(), modal_health_warmer=warmer),
)
port = int(server.server_address[1])
server_thread = Thread(target=server.serve_forever, daemon=True)
server_thread.start()
try:
status, snapshot = _request_json(port, "GET", "/scene/state", timeout=1)
assert status == 200
assert snapshot["simulation"]["models"][0]["status"] == "ready"
assert warmer.trigger_count == 0
finally:
server.shutdown()
server.server_close()
server_thread.join(timeout=1)
def test_modal_health_urls_use_only_modal_openai_connectors() -> None:
config = _config(
npc_count=2,
connector=ConnectorConfig(
type="openai_compatible",
base_url="https://workspace--npc-serve.modal.run/v1",
model="npc-model",
),
secondary_connectors={
"qwen": ConnectorConfig(
type="openai_compatible",
base_url="https://workspace--qwen-serve.modal.run/v1/chat/completions",
model="qwen-model",
)
},
)
assert _modal_health_urls(config) == [
"https://workspace--npc-serve.modal.run/health",
"https://workspace--qwen-serve.modal.run/health",
]
config = _config(
npc_count=2,
connector=ConnectorConfig(
type="openai_compatible",
base_url="https://workspace--npc-serve.modal.run/v1",
model="npc-model",
),
secondary_connectors={
"external": ConnectorConfig(
type="openai_compatible",
base_url="https://api.openai.com/v1",
model="external-model",
)
},
)
assert _modal_health_urls(config) == [
"https://workspace--npc-serve.modal.run/health"
]
def test_http_server_admin_can_switch_npc_model(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("ADMIN_TOKEN", "test-admin")
config = _config(
npc_count=2,
connector=ConnectorConfig(
type="openai_compatible",
base_url="https://workspace--npc-serve.modal.run/v1",
model="npc-model",
),
secondary_connectors={
"qwen": ConnectorConfig(
type="openai_compatible",
base_url="https://workspace--qwen-serve.modal.run/v1",
model="qwen-model",
)
},
)
world = create_world(config)
runtime = GameRuntime(world=world, simulator=_SlowSimulator(), config=config)
server = ThreadingHTTPServer(("127.0.0.1", 0), build_handler(runtime))
port = int(server.server_address[1])
server_thread = Thread(target=server.serve_forever, daemon=True)
server_thread.start()
try:
status, payload = _request_json(
port,
"GET",
"/admin/models",
timeout=1,
headers={"X-Admin-Token": "test-admin"},
)
assert status == 200
assert {profile["id"] for profile in payload["profiles"]} >= {"default", "qwen"}
qwen_profile = next(profile for profile in payload["profiles"] if profile["id"] == "qwen")
assert qwen_profile["model"] == "qwen-model"
status, payload = _request_json(
port,
"POST",
"/admin/npcs/npc-001/model",
timeout=1,
headers={"X-Admin-Token": "test-admin"},
body={"profile_id": "qwen"},
)
assert status == 200
assert payload["profile"]["id"] == "qwen"
assert world.npcs[0].model_profile_id == "qwen"
assert world.npcs[0].connector_id == "qwen"
finally:
server.shutdown()
server.server_close()
server_thread.join(timeout=1)
def test_forced_modal_warmup_does_not_duplicate_inflight_checks() -> None:
opener = _BlockingOpener()
warmer = _ModalHealthWarmer(
[
_ModalHealthTarget(
id="npc_model",
label="NPC model",
model="test-model",
health_url="https://workspace--npc-serve.modal.run/health",
)
],
opener=opener,
)
warmer.trigger(force=True)
assert opener.started.wait(timeout=1)
warmer.trigger(force=True)
statuses = warmer.statuses()
try:
assert opener.call_count == 1
assert statuses == [
{
"id": "npc_model",
"label": "NPC model",
"model": "test-model",
"status": "checking",
"http_status": None,
}
]
finally:
opener.release.set()
def test_modal_health_warmer_keeps_503_loading_as_warmup_until_ready() -> None:
opener = _LoadingThenReadyOpener()
warmer = _ModalHealthWarmer(
[
_ModalHealthTarget(
id="npc_model",
label="NPC model",
model="test-model",
health_url="https://workspace--npc-serve.modal.run/health",
)
],
warmup_retry_seconds=0.05,
opener=opener,
)
warmer.trigger(force=True)
assert opener.loading_returned.wait(timeout=1)
_wait_for_status(warmer, status="warmup", http_status=503)
opener.allow_ready.set()
assert opener.ready_returned.wait(timeout=1)
_wait_for_status(warmer, status="ready", http_status=200)
class _SlowSimulator:
name = "slow"
def __init__(self) -> None:
self.started = Event()
self.release = Event()
def propose_tick(self, _world: WorldState, _next_tick: int) -> TickPlan:
self.started.set()
self.release.wait(timeout=5)
return TickPlan(source=self.name, directives=[])
class _FakeHealthWarmer:
def __init__(self) -> None:
self.trigger_count = 0
def trigger(self, *, force: bool = False) -> None:
self.trigger_count += 1
def statuses(self) -> list[dict[str, Any]]:
return [
{
"id": "npc_model",
"label": "NPC model",
"model": "test-model",
"status": "ready",
"http_status": 200,
}
]
class _BlockingOpener:
def __init__(self) -> None:
self.started = Event()
self.release = Event()
self._lock = Lock()
self.call_count = 0
def __call__(self, url: str) -> Any:
_ = url
with self._lock:
self.call_count += 1
self.started.set()
self.release.wait(timeout=5)
return _FakeResponse()
class _LoadingThenReadyOpener:
def __init__(self) -> None:
self.loading_returned = Event()
self.allow_ready = Event()
self.ready_returned = Event()
self._lock = Lock()
self.call_count = 0
def __call__(self, url: str) -> Any:
with self._lock:
self.call_count += 1
call_count = self.call_count
if call_count == 1:
self.loading_returned.set()
raise HTTPError(url, 503, "Service Unavailable", hdrs=None, fp=None)
self.allow_ready.wait(timeout=5)
self.ready_returned.set()
return _FakeResponse()
class _FakeResponse:
status: int = 200
def close(self) -> None:
return
def _request_json(
port: int,
method: str,
path: str,
*,
timeout: float,
headers: dict[str, str] | None = None,
body: dict[str, Any] | None = None,
) -> tuple[int, dict[str, Any]]:
connection = HTTPConnection("127.0.0.1", port, timeout=timeout)
try:
request_body = json.dumps(body).encode("utf-8") if body is not None else None
request_headers = dict(headers or {})
if request_body is not None:
request_headers.setdefault("Content-Type", "application/json")
connection.request(method, path, body=request_body, headers=request_headers)
response = connection.getresponse()
body = response.read().decode("utf-8")
finally:
connection.close()
payload = json.loads(body)
assert isinstance(payload, dict)
return response.status, payload
def _wait_for_status(
warmer: _ModalHealthWarmer,
*,
status: str,
http_status: int,
) -> None:
deadline = time.monotonic() + 1
while time.monotonic() < deadline:
statuses = warmer.statuses()
if (
statuses[0]["status"] == status
and statuses[0]["http_status"] == http_status
):
return
time.sleep(0.01)
raise AssertionError(f"Expected {status=} and {http_status=}, got {warmer.statuses()!r}")
def _config(
*,
npc_count: int,
connector: ConnectorConfig | None = None,
god_console: ConnectorConfig | None = None,
secondary_connectors: dict[str, ConnectorConfig] | None = None,
) -> GameConfig:
return GameConfig(
world=WorldConfig(width=80, depth=80, terrain="plain_green", seed=42),
npcs=NpcConfig(count=npc_count),
simulation=SimulationConfig(tick_ms=500),
server=ServerConfig(host="127.0.0.1", port=8000),
connector=connector or ConnectorConfig(type="deterministic"),
god_console=god_console,
secondary_connectors=secondary_connectors or {},
)