| from __future__ import annotations |
|
|
| import json |
| from http import HTTPStatus |
| from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer |
| import os |
| from typing import Any |
| from urllib.parse import parse_qs, unquote, urlparse |
|
|
| from world_simulator.api.runtime import ( |
| GameRuntime, |
| create_game_runtime, |
| ) |
| from world_simulator.api.runtime import ( |
| HealthWarmer as _HealthWarmer, |
| ) |
| from world_simulator.api.runtime import ( |
| ModalHealthTarget as _ModalHealthTarget, |
| ) |
| from world_simulator.api.runtime import ( |
| ModalHealthWarmer as _ModalHealthWarmer, |
| ) |
| from world_simulator.api.runtime import ( |
| modal_health_targets as _modal_health_targets, |
| ) |
| from world_simulator.api.runtime import ( |
| modal_health_url as _modal_health_url, |
| ) |
| from world_simulator.api.runtime import ( |
| modal_health_urls as _modal_health_urls, |
| ) |
| from world_simulator.config import GameConfig |
| from world_simulator.domain import WorldState |
| from world_simulator.simulation.connectors.base import WorldSimulator |
|
|
| __all__ = [ |
| "_ModalHealthTarget", |
| "_ModalHealthWarmer", |
| "_build_handler", |
| "_modal_health_targets", |
| "_modal_health_url", |
| "_modal_health_urls", |
| "build_handler", |
| "run_server", |
| ] |
|
|
|
|
| def run_server(*, world: WorldState, config: GameConfig) -> None: |
| runtime = create_game_runtime(world=world, config=config) |
| handler = build_handler(runtime) |
| address = (config.server.host, config.server.port) |
| server = ThreadingHTTPServer(address, handler) |
| print(f"God Simulator API listening on http://{config.server.host}:{config.server.port}") |
| print(f"World simulator connector: {runtime.simulator_name}") |
| print( |
| "Available endpoints: GET /health, GET /state, GET /scene/state, " |
| "POST /tick" |
| ) |
| server.serve_forever() |
|
|
|
|
| def _build_handler( |
| world: WorldState, |
| simulator: WorldSimulator, |
| god_console: object | None = None, |
| modal_health_warmer: _HealthWarmer | None = None, |
| overseer: object | None = None, |
| ) -> type[BaseHTTPRequestHandler]: |
| _ = god_console, overseer |
| return build_handler( |
| GameRuntime( |
| world=world, |
| simulator=simulator, |
| modal_health_warmer=modal_health_warmer, |
| ) |
| ) |
|
|
|
|
| def build_handler(runtime: GameRuntime) -> type[BaseHTTPRequestHandler]: |
| class GodSimulatorRequestHandler(BaseHTTPRequestHandler): |
| server_version = "GodSimulator/0.1" |
|
|
| def do_OPTIONS(self) -> None: |
| self.send_response(HTTPStatus.NO_CONTENT) |
| self._send_cors_headers() |
| self.end_headers() |
|
|
| def do_GET(self) -> None: |
| parsed_path = urlparse(self.path) |
| path = parsed_path.path |
|
|
| if path == "/health": |
| self._send_json(runtime.health()) |
| return |
| if path == "/state": |
| self._send_json(runtime.state()) |
| return |
| if path == "/scene/state": |
| query = parse_qs(parsed_path.query) |
| self._send_json( |
| runtime.scene_state(warmup=_truthy_query_value(query, "warmup")) |
| ) |
| return |
| if path == "/admin/models": |
| if not self._require_admin(): |
| return |
| self._send_json(runtime.admin_models()) |
| return |
|
|
| self._send_json( |
| {"error": "not_found", "path": path}, |
| status=HTTPStatus.NOT_FOUND, |
| ) |
|
|
| def do_POST(self) -> None: |
| parsed_path = urlparse(self.path) |
| path = parsed_path.path |
|
|
| if path == "/tick": |
| status, payload = runtime.tick() |
| self._send_json(payload, status=status) |
| return |
| if path.startswith("/admin/npcs/") and path.endswith("/model"): |
| if not self._require_admin(): |
| return |
| npc_id = unquote(path.removeprefix("/admin/npcs/").removesuffix("/model")) |
| try: |
| profile_id = self._read_json_field("profile_id") |
| except ValueError as exc: |
| self._send_json( |
| {"error": "bad_request", "message": str(exc)}, |
| status=HTTPStatus.BAD_REQUEST, |
| ) |
| return |
| status, payload = runtime.admin_set_npc_model(npc_id, profile_id) |
| self._send_json(payload, status=status) |
| return |
|
|
| self._send_json( |
| {"error": "not_found", "path": path}, |
| status=HTTPStatus.NOT_FOUND, |
| ) |
|
|
| def log_message(self, format: str, *args: object) -> None: |
| print(f"{self.address_string()} - {format % args}") |
|
|
| def _send_json( |
| self, |
| payload: dict[str, Any], |
| *, |
| status: HTTPStatus = HTTPStatus.OK, |
| ) -> None: |
| body = json.dumps(payload, sort_keys=True).encode("utf-8") |
| try: |
| self.send_response(status) |
| self.send_header("Content-Type", "application/json; charset=utf-8") |
| self.send_header("Content-Length", str(len(body))) |
| self._send_cors_headers() |
| self.end_headers() |
| self.wfile.write(body) |
| except (BrokenPipeError, ConnectionAbortedError, ConnectionResetError): |
| return |
|
|
| def _send_cors_headers(self) -> None: |
| self.send_header("Access-Control-Allow-Origin", "*") |
| self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") |
| self.send_header("Access-Control-Allow-Headers", "Content-Type, X-Admin-Token") |
|
|
| def _require_admin(self) -> bool: |
| expected = os.getenv("ADMIN_TOKEN", "dev-admin-token") |
| if self.headers.get("X-Admin-Token") == expected: |
| return True |
| self._send_json( |
| {"error": "admin_token_required"}, |
| status=HTTPStatus.FORBIDDEN, |
| ) |
| return False |
|
|
| def _read_json_field(self, field: str) -> str: |
| length_raw = self.headers.get("Content-Length") |
| if not length_raw: |
| raise ValueError("Missing request body.") |
|
|
| try: |
| length = int(length_raw) |
| except ValueError as exc: |
| raise ValueError("Invalid Content-Length.") from exc |
|
|
| raw_body = self.rfile.read(length).decode("utf-8") |
| try: |
| payload = json.loads(raw_body) |
| except json.JSONDecodeError as exc: |
| raise ValueError("Request body must be JSON.") from exc |
|
|
| if not isinstance(payload, dict): |
| raise ValueError("Request body must be a JSON object.") |
| value = payload.get(field) |
| if not isinstance(value, str) or not value.strip(): |
| raise ValueError(f"Expected non-empty '{field}'.") |
| return value.strip() |
|
|
| return GodSimulatorRequestHandler |
|
|
|
|
| def _truthy_query_value(query: dict[str, list[str]], key: str) -> bool: |
| return any(value.lower() in ("1", "true", "yes") for value in query.get(key, [])) |
|
|