Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| from http import HTTPStatus | |
| from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer | |
| import os | |
| from typing import Any | |
| from urllib.parse import parse_qs, unquote, urlparse | |
| from world_simulator.api.runtime import ( | |
| GameRuntime, | |
| create_game_runtime, | |
| ) | |
| from world_simulator.api.runtime import ( | |
| HealthWarmer as _HealthWarmer, | |
| ) | |
| from world_simulator.api.runtime import ( | |
| ModalHealthTarget as _ModalHealthTarget, | |
| ) | |
| from world_simulator.api.runtime import ( | |
| ModalHealthWarmer as _ModalHealthWarmer, | |
| ) | |
| from world_simulator.api.runtime import ( | |
| modal_health_targets as _modal_health_targets, | |
| ) | |
| from world_simulator.api.runtime import ( | |
| modal_health_url as _modal_health_url, | |
| ) | |
| from world_simulator.api.runtime import ( | |
| modal_health_urls as _modal_health_urls, | |
| ) | |
| from world_simulator.config import GameConfig | |
| from world_simulator.domain import WorldState | |
| from world_simulator.simulation.connectors.base import WorldSimulator | |
| __all__ = [ | |
| "_ModalHealthTarget", | |
| "_ModalHealthWarmer", | |
| "_build_handler", | |
| "_modal_health_targets", | |
| "_modal_health_url", | |
| "_modal_health_urls", | |
| "build_handler", | |
| "run_server", | |
| ] | |
| def run_server(*, world: WorldState, config: GameConfig) -> None: | |
| runtime = create_game_runtime(world=world, config=config) | |
| handler = build_handler(runtime) | |
| address = (config.server.host, config.server.port) | |
| server = ThreadingHTTPServer(address, handler) | |
| print(f"God Simulator API listening on http://{config.server.host}:{config.server.port}") | |
| print(f"World simulator connector: {runtime.simulator_name}") | |
| print( | |
| "Available endpoints: GET /health, GET /state, GET /scene/state, " | |
| "POST /tick" | |
| ) | |
| server.serve_forever() | |
| def _build_handler( | |
| world: WorldState, | |
| simulator: WorldSimulator, | |
| god_console: object | None = None, | |
| modal_health_warmer: _HealthWarmer | None = None, | |
| overseer: object | None = None, | |
| ) -> type[BaseHTTPRequestHandler]: | |
| _ = god_console, overseer | |
| return build_handler( | |
| GameRuntime( | |
| world=world, | |
| simulator=simulator, | |
| modal_health_warmer=modal_health_warmer, | |
| ) | |
| ) | |
| def build_handler(runtime: GameRuntime) -> type[BaseHTTPRequestHandler]: | |
| class GodSimulatorRequestHandler(BaseHTTPRequestHandler): | |
| server_version = "GodSimulator/0.1" | |
| def do_OPTIONS(self) -> None: | |
| self.send_response(HTTPStatus.NO_CONTENT) | |
| self._send_cors_headers() | |
| self.end_headers() | |
| def do_GET(self) -> None: | |
| parsed_path = urlparse(self.path) | |
| path = parsed_path.path | |
| if path == "/health": | |
| self._send_json(runtime.health()) | |
| return | |
| if path == "/state": | |
| self._send_json(runtime.state()) | |
| return | |
| if path == "/scene/state": | |
| query = parse_qs(parsed_path.query) | |
| self._send_json( | |
| runtime.scene_state(warmup=_truthy_query_value(query, "warmup")) | |
| ) | |
| return | |
| if path == "/admin/models": | |
| if not self._require_admin(): | |
| return | |
| self._send_json(runtime.admin_models()) | |
| return | |
| self._send_json( | |
| {"error": "not_found", "path": path}, | |
| status=HTTPStatus.NOT_FOUND, | |
| ) | |
| def do_POST(self) -> None: | |
| parsed_path = urlparse(self.path) | |
| path = parsed_path.path | |
| if path == "/tick": | |
| status, payload = runtime.tick() | |
| self._send_json(payload, status=status) | |
| return | |
| if path.startswith("/admin/npcs/") and path.endswith("/model"): | |
| if not self._require_admin(): | |
| return | |
| npc_id = unquote(path.removeprefix("/admin/npcs/").removesuffix("/model")) | |
| try: | |
| profile_id = self._read_json_field("profile_id") | |
| except ValueError as exc: | |
| self._send_json( | |
| {"error": "bad_request", "message": str(exc)}, | |
| status=HTTPStatus.BAD_REQUEST, | |
| ) | |
| return | |
| status, payload = runtime.admin_set_npc_model(npc_id, profile_id) | |
| self._send_json(payload, status=status) | |
| return | |
| self._send_json( | |
| {"error": "not_found", "path": path}, | |
| status=HTTPStatus.NOT_FOUND, | |
| ) | |
| def log_message(self, format: str, *args: object) -> None: | |
| print(f"{self.address_string()} - {format % args}") | |
| def _send_json( | |
| self, | |
| payload: dict[str, Any], | |
| *, | |
| status: HTTPStatus = HTTPStatus.OK, | |
| ) -> None: | |
| body = json.dumps(payload, sort_keys=True).encode("utf-8") | |
| try: | |
| self.send_response(status) | |
| self.send_header("Content-Type", "application/json; charset=utf-8") | |
| self.send_header("Content-Length", str(len(body))) | |
| self._send_cors_headers() | |
| self.end_headers() | |
| self.wfile.write(body) | |
| except (BrokenPipeError, ConnectionAbortedError, ConnectionResetError): | |
| return | |
| def _send_cors_headers(self) -> None: | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") | |
| self.send_header("Access-Control-Allow-Headers", "Content-Type, X-Admin-Token") | |
| def _require_admin(self) -> bool: | |
| expected = os.getenv("ADMIN_TOKEN", "dev-admin-token") | |
| if self.headers.get("X-Admin-Token") == expected: | |
| return True | |
| self._send_json( | |
| {"error": "admin_token_required"}, | |
| status=HTTPStatus.FORBIDDEN, | |
| ) | |
| return False | |
| def _read_json_field(self, field: str) -> str: | |
| length_raw = self.headers.get("Content-Length") | |
| if not length_raw: | |
| raise ValueError("Missing request body.") | |
| try: | |
| length = int(length_raw) | |
| except ValueError as exc: | |
| raise ValueError("Invalid Content-Length.") from exc | |
| raw_body = self.rfile.read(length).decode("utf-8") | |
| try: | |
| payload = json.loads(raw_body) | |
| except json.JSONDecodeError as exc: | |
| raise ValueError("Request body must be JSON.") from exc | |
| if not isinstance(payload, dict): | |
| raise ValueError("Request body must be a JSON object.") | |
| value = payload.get(field) | |
| if not isinstance(value, str) or not value.strip(): | |
| raise ValueError(f"Expected non-empty '{field}'.") | |
| return value.strip() | |
| return GodSimulatorRequestHandler | |
| def _truthy_query_value(query: dict[str, list[str]], key: str) -> bool: | |
| return any(value.lower() in ("1", "true", "yes") for value in query.get(key, [])) | |