kikikita's picture
feat: implement NPC model switching and admin API for model management
e2d35c0
Raw
History Blame Contribute Delete
7.25 kB
from __future__ import annotations
import json
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import os
from typing import Any
from urllib.parse import parse_qs, unquote, urlparse
from world_simulator.api.runtime import (
GameRuntime,
create_game_runtime,
)
from world_simulator.api.runtime import (
HealthWarmer as _HealthWarmer,
)
from world_simulator.api.runtime import (
ModalHealthTarget as _ModalHealthTarget,
)
from world_simulator.api.runtime import (
ModalHealthWarmer as _ModalHealthWarmer,
)
from world_simulator.api.runtime import (
modal_health_targets as _modal_health_targets,
)
from world_simulator.api.runtime import (
modal_health_url as _modal_health_url,
)
from world_simulator.api.runtime import (
modal_health_urls as _modal_health_urls,
)
from world_simulator.config import GameConfig
from world_simulator.domain import WorldState
from world_simulator.simulation.connectors.base import WorldSimulator
__all__ = [
"_ModalHealthTarget",
"_ModalHealthWarmer",
"_build_handler",
"_modal_health_targets",
"_modal_health_url",
"_modal_health_urls",
"build_handler",
"run_server",
]
def run_server(*, world: WorldState, config: GameConfig) -> None:
runtime = create_game_runtime(world=world, config=config)
handler = build_handler(runtime)
address = (config.server.host, config.server.port)
server = ThreadingHTTPServer(address, handler)
print(f"God Simulator API listening on http://{config.server.host}:{config.server.port}")
print(f"World simulator connector: {runtime.simulator_name}")
print(
"Available endpoints: GET /health, GET /state, GET /scene/state, "
"POST /tick"
)
server.serve_forever()
def _build_handler(
world: WorldState,
simulator: WorldSimulator,
god_console: object | None = None,
modal_health_warmer: _HealthWarmer | None = None,
overseer: object | None = None,
) -> type[BaseHTTPRequestHandler]:
_ = god_console, overseer
return build_handler(
GameRuntime(
world=world,
simulator=simulator,
modal_health_warmer=modal_health_warmer,
)
)
def build_handler(runtime: GameRuntime) -> type[BaseHTTPRequestHandler]:
class GodSimulatorRequestHandler(BaseHTTPRequestHandler):
server_version = "GodSimulator/0.1"
def do_OPTIONS(self) -> None:
self.send_response(HTTPStatus.NO_CONTENT)
self._send_cors_headers()
self.end_headers()
def do_GET(self) -> None:
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/health":
self._send_json(runtime.health())
return
if path == "/state":
self._send_json(runtime.state())
return
if path == "/scene/state":
query = parse_qs(parsed_path.query)
self._send_json(
runtime.scene_state(warmup=_truthy_query_value(query, "warmup"))
)
return
if path == "/admin/models":
if not self._require_admin():
return
self._send_json(runtime.admin_models())
return
self._send_json(
{"error": "not_found", "path": path},
status=HTTPStatus.NOT_FOUND,
)
def do_POST(self) -> None:
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/tick":
status, payload = runtime.tick()
self._send_json(payload, status=status)
return
if path.startswith("/admin/npcs/") and path.endswith("/model"):
if not self._require_admin():
return
npc_id = unquote(path.removeprefix("/admin/npcs/").removesuffix("/model"))
try:
profile_id = self._read_json_field("profile_id")
except ValueError as exc:
self._send_json(
{"error": "bad_request", "message": str(exc)},
status=HTTPStatus.BAD_REQUEST,
)
return
status, payload = runtime.admin_set_npc_model(npc_id, profile_id)
self._send_json(payload, status=status)
return
self._send_json(
{"error": "not_found", "path": path},
status=HTTPStatus.NOT_FOUND,
)
def log_message(self, format: str, *args: object) -> None:
print(f"{self.address_string()} - {format % args}")
def _send_json(
self,
payload: dict[str, Any],
*,
status: HTTPStatus = HTTPStatus.OK,
) -> None:
body = json.dumps(payload, sort_keys=True).encode("utf-8")
try:
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self._send_cors_headers()
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionAbortedError, ConnectionResetError):
return
def _send_cors_headers(self) -> None:
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, X-Admin-Token")
def _require_admin(self) -> bool:
expected = os.getenv("ADMIN_TOKEN", "dev-admin-token")
if self.headers.get("X-Admin-Token") == expected:
return True
self._send_json(
{"error": "admin_token_required"},
status=HTTPStatus.FORBIDDEN,
)
return False
def _read_json_field(self, field: str) -> str:
length_raw = self.headers.get("Content-Length")
if not length_raw:
raise ValueError("Missing request body.")
try:
length = int(length_raw)
except ValueError as exc:
raise ValueError("Invalid Content-Length.") from exc
raw_body = self.rfile.read(length).decode("utf-8")
try:
payload = json.loads(raw_body)
except json.JSONDecodeError as exc:
raise ValueError("Request body must be JSON.") from exc
if not isinstance(payload, dict):
raise ValueError("Request body must be a JSON object.")
value = payload.get(field)
if not isinstance(value, str) or not value.strip():
raise ValueError(f"Expected non-empty '{field}'.")
return value.strip()
return GodSimulatorRequestHandler
def _truthy_query_value(query: dict[str, list[str]], key: str) -> bool:
return any(value.lower() in ("1", "true", "yes") for value in query.get(key, []))