from __future__ import annotations import json from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import os from typing import Any from urllib.parse import parse_qs, unquote, urlparse from world_simulator.api.runtime import ( GameRuntime, create_game_runtime, ) from world_simulator.api.runtime import ( HealthWarmer as _HealthWarmer, ) from world_simulator.api.runtime import ( ModalHealthTarget as _ModalHealthTarget, ) from world_simulator.api.runtime import ( ModalHealthWarmer as _ModalHealthWarmer, ) from world_simulator.api.runtime import ( modal_health_targets as _modal_health_targets, ) from world_simulator.api.runtime import ( modal_health_url as _modal_health_url, ) from world_simulator.api.runtime import ( modal_health_urls as _modal_health_urls, ) from world_simulator.config import GameConfig from world_simulator.domain import WorldState from world_simulator.simulation.connectors.base import WorldSimulator __all__ = [ "_ModalHealthTarget", "_ModalHealthWarmer", "_build_handler", "_modal_health_targets", "_modal_health_url", "_modal_health_urls", "build_handler", "run_server", ] def run_server(*, world: WorldState, config: GameConfig) -> None: runtime = create_game_runtime(world=world, config=config) handler = build_handler(runtime) address = (config.server.host, config.server.port) server = ThreadingHTTPServer(address, handler) print(f"God Simulator API listening on http://{config.server.host}:{config.server.port}") print(f"World simulator connector: {runtime.simulator_name}") print( "Available endpoints: GET /health, GET /state, GET /scene/state, " "POST /tick" ) server.serve_forever() def _build_handler( world: WorldState, simulator: WorldSimulator, god_console: object | None = None, modal_health_warmer: _HealthWarmer | None = None, overseer: object | None = None, ) -> type[BaseHTTPRequestHandler]: _ = god_console, overseer return build_handler( GameRuntime( world=world, simulator=simulator, modal_health_warmer=modal_health_warmer, ) ) def build_handler(runtime: GameRuntime) -> type[BaseHTTPRequestHandler]: class GodSimulatorRequestHandler(BaseHTTPRequestHandler): server_version = "GodSimulator/0.1" def do_OPTIONS(self) -> None: self.send_response(HTTPStatus.NO_CONTENT) self._send_cors_headers() self.end_headers() def do_GET(self) -> None: parsed_path = urlparse(self.path) path = parsed_path.path if path == "/health": self._send_json(runtime.health()) return if path == "/state": self._send_json(runtime.state()) return if path == "/scene/state": query = parse_qs(parsed_path.query) self._send_json( runtime.scene_state(warmup=_truthy_query_value(query, "warmup")) ) return if path == "/admin/models": if not self._require_admin(): return self._send_json(runtime.admin_models()) return self._send_json( {"error": "not_found", "path": path}, status=HTTPStatus.NOT_FOUND, ) def do_POST(self) -> None: parsed_path = urlparse(self.path) path = parsed_path.path if path == "/tick": status, payload = runtime.tick() self._send_json(payload, status=status) return if path.startswith("/admin/npcs/") and path.endswith("/model"): if not self._require_admin(): return npc_id = unquote(path.removeprefix("/admin/npcs/").removesuffix("/model")) try: profile_id = self._read_json_field("profile_id") except ValueError as exc: self._send_json( {"error": "bad_request", "message": str(exc)}, status=HTTPStatus.BAD_REQUEST, ) return status, payload = runtime.admin_set_npc_model(npc_id, profile_id) self._send_json(payload, status=status) return self._send_json( {"error": "not_found", "path": path}, status=HTTPStatus.NOT_FOUND, ) def log_message(self, format: str, *args: object) -> None: print(f"{self.address_string()} - {format % args}") def _send_json( self, payload: dict[str, Any], *, status: HTTPStatus = HTTPStatus.OK, ) -> None: body = json.dumps(payload, sort_keys=True).encode("utf-8") try: self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self._send_cors_headers() self.end_headers() self.wfile.write(body) except (BrokenPipeError, ConnectionAbortedError, ConnectionResetError): return def _send_cors_headers(self) -> None: self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type, X-Admin-Token") def _require_admin(self) -> bool: expected = os.getenv("ADMIN_TOKEN", "dev-admin-token") if self.headers.get("X-Admin-Token") == expected: return True self._send_json( {"error": "admin_token_required"}, status=HTTPStatus.FORBIDDEN, ) return False def _read_json_field(self, field: str) -> str: length_raw = self.headers.get("Content-Length") if not length_raw: raise ValueError("Missing request body.") try: length = int(length_raw) except ValueError as exc: raise ValueError("Invalid Content-Length.") from exc raw_body = self.rfile.read(length).decode("utf-8") try: payload = json.loads(raw_body) except json.JSONDecodeError as exc: raise ValueError("Request body must be JSON.") from exc if not isinstance(payload, dict): raise ValueError("Request body must be a JSON object.") value = payload.get(field) if not isinstance(value, str) or not value.strip(): raise ValueError(f"Expected non-empty '{field}'.") return value.strip() return GodSimulatorRequestHandler def _truthy_query_value(query: dict[str, list[str]], key: str) -> bool: return any(value.lower() in ("1", "true", "yes") for value in query.get(key, []))