Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| import os | |
| import time | |
| from copy import deepcopy | |
| from dataclasses import dataclass | |
| from http import HTTPStatus | |
| from pathlib import Path | |
| from threading import Lock, Thread | |
| from typing import Any, Protocol | |
| from urllib.error import HTTPError, URLError | |
| from urllib.request import Request, urlopen | |
| from world_simulator.api.modal_auth import modal_proxy_auth_headers | |
| from world_simulator.api.persistence import ( | |
| BackgroundSnapshotWriter, | |
| SnapshotStore, | |
| build_snapshot, | |
| ) | |
| from world_simulator.api.players import ( | |
| PlayerCharacter, | |
| PlayerError, | |
| PlayerManager, | |
| ) | |
| from world_simulator.config import ConnectorConfig, GameConfig | |
| from world_simulator.domain import WorldLogEvent, WorldState | |
| from world_simulator.observability import RunLedger, append_record, set_active_tick_buffer | |
| from world_simulator.rendering.scene_contract import to_scene_snapshot | |
| from world_simulator.simulation.connectors.base import TickPlan, WorldSimulator | |
| from world_simulator.simulation.connectors.factory import create_world_simulator | |
| from world_simulator.simulation.directives import expire_directives | |
| from world_simulator.simulation.spawning import create_world | |
| from world_simulator.simulation.tick import apply_tick_plan, plan_world_tick | |
| class GameRuntime: | |
| def __init__( | |
| self, | |
| *, | |
| world: WorldState, | |
| simulator: WorldSimulator, | |
| modal_health_warmer: HealthWarmer | None = None, | |
| players: PlayerManager | None = None, | |
| ledger: RunLedger | None = None, | |
| config: GameConfig | None = None, | |
| snapshot_writer: BackgroundSnapshotWriter | None = None, | |
| snapshot_store: SnapshotStore | None = None, | |
| ) -> None: | |
| self._world = world | |
| self._simulator = simulator | |
| self._modal_health_warmer = modal_health_warmer | |
| self._players = players or PlayerManager() | |
| self._ledger = ledger or RunLedger() | |
| self._config = config | |
| self._snapshot_writer = snapshot_writer | |
| self._snapshot_store = snapshot_store | |
| self._lock = Lock() | |
| self._tick_status = TickStatus() | |
| self._pending_commands: list[PendingCommand] = [] | |
| def simulator_name(self) -> str: | |
| return self._simulator.name | |
| def health(self) -> dict[str, Any]: | |
| with self._lock: | |
| return { | |
| "status": "ok", | |
| "tick": self._world.tick, | |
| "simulator": self._simulator.name, | |
| "last_tick_source": self._world.last_tick_source, | |
| **tick_status_payload(self._tick_status), | |
| } | |
| def state(self) -> dict[str, Any]: | |
| with self._lock: | |
| payload = self._world.to_dict() | |
| payload["simulation"] = tick_status_payload(self._tick_status) | |
| return payload | |
| def scene_state(self, *, warmup: bool = False) -> dict[str, Any]: | |
| if warmup and self._modal_health_warmer is not None: | |
| self._modal_health_warmer.trigger(force=True) | |
| with self._lock: | |
| payload = to_scene_snapshot( | |
| self._world, | |
| tick_in_progress=self._tick_status.in_progress, | |
| pending_tick=self._tick_status.pending_tick, | |
| ) | |
| if self._modal_health_warmer is not None: | |
| payload["simulation"]["models"] = self._modal_health_warmer.statuses() | |
| return payload | |
| def tick(self) -> tuple[HTTPStatus, dict[str, Any]]: | |
| planning_world, next_tick, pending_commands, conflict = self._begin_tick() | |
| if conflict is not None: | |
| return HTTPStatus.CONFLICT, conflict | |
| assert planning_world is not None | |
| assert next_tick is not None | |
| assert pending_commands is not None | |
| buffer = self._ledger.start_tick( | |
| next_tick, | |
| npc_order=[npc.id for npc in planning_world.living_npcs()], | |
| ) | |
| set_active_tick_buffer(buffer) | |
| try: | |
| expire_directives(planning_world, tick=next_tick) | |
| planned_tick, plan = plan_world_tick( | |
| planning_world, | |
| self._simulator, | |
| next_tick, | |
| ) | |
| except Exception as exc: | |
| with self._lock: | |
| self._tick_status.in_progress = False | |
| self._tick_status.pending_tick = None | |
| self._pending_commands = [*pending_commands, *self._pending_commands] | |
| set_active_tick_buffer(None) | |
| return HTTPStatus.INTERNAL_SERVER_ERROR, { | |
| "error": "tick_failed", | |
| "message": str(exc), | |
| } | |
| try: | |
| with self._lock: | |
| before_events = len(self._world.event_log) | |
| plan = self._override_player_directives(planning_world, plan) | |
| apply_tick_plan(self._world, planned_tick, plan) | |
| append_record( | |
| { | |
| "tick": planned_tick, | |
| "phase": "engine_events", | |
| "events": [ | |
| event | |
| for event in self._world.to_dict()["event_log"][before_events:] | |
| ], | |
| } | |
| ) | |
| self._tick_status.in_progress = False | |
| self._tick_status.pending_tick = None | |
| payload = self._world.to_dict() | |
| payload["simulation"] = tick_status_payload(self._tick_status) | |
| self._persist_snapshot() | |
| self._ledger.flush_tick(buffer) | |
| return HTTPStatus.OK, payload | |
| except Exception as exc: | |
| # The apply phase failed AFTER planning (the LLM calls already | |
| # finished). Clear the in-progress flag so the next tick is not | |
| # blocked forever by a stale "tick in progress" - otherwise the | |
| # simulation hangs even though no API call is running. | |
| with self._lock: | |
| self._tick_status.in_progress = False | |
| self._tick_status.pending_tick = None | |
| return HTTPStatus.INTERNAL_SERVER_ERROR, { | |
| "error": "tick_failed", | |
| "message": str(exc), | |
| } | |
| finally: | |
| set_active_tick_buffer(None) | |
| def _persist_snapshot(self) -> None: | |
| """Submit a full world+players snapshot to the background writer. | |
| Called while holding ``self._lock`` so the captured state is consistent; | |
| the actual (slow, bucket) write happens off-lock on the writer thread. | |
| ``build_snapshot`` makes its own ``to_dict`` copy, so the writer never | |
| races with the ``payload`` dict returned to the caller. | |
| """ | |
| if self._snapshot_writer is None: | |
| return | |
| self._snapshot_writer.submit(build_snapshot(self._world, self._players.snapshot())) | |
| def reset_world(self) -> tuple[HTTPStatus, dict[str, Any]]: | |
| """Wipe the live world: fresh world, no players, deleted snapshot files. | |
| Used by the Admin panel. Drops any in-flight snapshot first so the | |
| background writer cannot resurrect the deleted state, then clears the | |
| store on disk. | |
| """ | |
| if self._config is None: | |
| return HTTPStatus.INTERNAL_SERVER_ERROR, { | |
| "error": "reset_unavailable", | |
| "message": "Runtime has no config to rebuild the world from.", | |
| } | |
| with self._lock: | |
| if self._snapshot_writer is not None: | |
| self._snapshot_writer.drop_pending() | |
| self._world = create_world(self._config) | |
| self._players = PlayerManager() | |
| self._tick_status = TickStatus() | |
| self._pending_commands = [] | |
| if self._snapshot_store is not None: | |
| self._snapshot_store.clear() | |
| return HTTPStatus.OK, {"ok": True, "tick": self._world.tick} | |
| # -- player characters (external agents over MCP) ---------------------- # | |
| def create_player( | |
| self, | |
| *, | |
| name: str, | |
| icon: str, | |
| country_id: str = "nemotron", | |
| ) -> tuple[HTTPStatus, dict[str, Any]]: | |
| with self._lock: | |
| try: | |
| character = self._players.create( | |
| name=name, | |
| icon=icon, | |
| country_id=country_id, | |
| created_tick=self._world.tick, | |
| ) | |
| except PlayerError as exc: | |
| return HTTPStatus.BAD_REQUEST, _player_error_payload(exc) | |
| return HTTPStatus.OK, _character_summary(character) | |
| def player_status(self, token: str) -> tuple[HTTPStatus, dict[str, Any]]: | |
| with self._lock: | |
| try: | |
| character = self._players.get(token) | |
| return HTTPStatus.OK, self._players.character_status(character, self._world) | |
| except PlayerError as exc: | |
| return HTTPStatus.NOT_FOUND, _player_error_payload(exc) | |
| def player_observe(self, token: str) -> tuple[HTTPStatus, dict[str, Any]]: | |
| with self._lock: | |
| try: | |
| character = self._players.get(token) | |
| return HTTPStatus.OK, self._players.observation(character, self._world) | |
| except PlayerError as exc: | |
| return _player_error_status(exc), _player_error_payload(exc) | |
| def player_act( | |
| self, | |
| token: str, | |
| payload: dict[str, Any], | |
| ) -> tuple[HTTPStatus, dict[str, Any]]: | |
| with self._lock: | |
| try: | |
| character = self._players.get(token) | |
| self._players.queue_action(character, payload, self._world) | |
| observation = self._players.observation(character, self._world) | |
| except PlayerError as exc: | |
| return _player_error_status(exc), _player_error_payload(exc) | |
| return HTTPStatus.OK, { | |
| "queued_action": character.pending_action, | |
| "message": ( | |
| f"Action '{character.pending_action['action']}' queued for the next tick." | |
| if character.pending_action | |
| else "Action queued." | |
| ), | |
| "observation": observation, | |
| } | |
| def player_vote_available(self, token: str) -> bool: | |
| with self._lock: | |
| try: | |
| character = self._players.get(token) | |
| except PlayerError: | |
| return False | |
| for election in self._world.elections: | |
| if election.country_id != character.country_id or election.completed: | |
| continue | |
| if election.start_tick <= self._world.tick <= election.end_tick: | |
| return True | |
| return False | |
| # -- admin ------------------------------------------------------------- # | |
| def admin_logs_runs(self) -> dict[str, Any]: | |
| root = self._ledger.run_dir.parent | |
| runs: list[dict[str, Any]] = [] | |
| for run_dir in sorted(root.glob("run_*"), key=lambda item: item.name, reverse=True): | |
| if not run_dir.is_dir(): | |
| continue | |
| ledger_path = run_dir / "ledger.jsonl" | |
| runs.append( | |
| { | |
| "run_id": run_dir.name, | |
| "current": run_dir == self._ledger.run_dir, | |
| "ledger_path": str(ledger_path), | |
| "exists": ledger_path.is_file(), | |
| "size_bytes": ledger_path.stat().st_size if ledger_path.is_file() else 0, | |
| } | |
| ) | |
| return {"runs": runs} | |
| def admin_log_records( | |
| self, | |
| *, | |
| run_id: str | None = None, | |
| since_line: int = 0, | |
| limit: int = 200, | |
| phase: str | None = None, | |
| npc_id: str | None = None, | |
| tick_from: int | None = None, | |
| tick_to: int | None = None, | |
| ) -> tuple[HTTPStatus, dict[str, Any]]: | |
| run_dir = self._ledger.run_dir if run_id in (None, "current") else self._run_dir(run_id) | |
| if run_dir is None: | |
| return HTTPStatus.NOT_FOUND, {"error": "run_not_found", "message": "Unknown ledger run."} | |
| ledger_path = run_dir / "ledger.jsonl" | |
| if not ledger_path.is_file(): | |
| return HTTPStatus.NOT_FOUND, {"error": "ledger_not_found", "message": "Run has no ledger.jsonl."} | |
| records: list[dict[str, Any]] = [] | |
| next_line = max(0, since_line) | |
| limit = max(1, min(limit, 1000)) | |
| with ledger_path.open("r", encoding="utf-8") as ledger_file: | |
| for line_no, line in enumerate(ledger_file): | |
| next_line = line_no + 1 | |
| if line_no < since_line: | |
| continue | |
| try: | |
| record = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| if not _record_matches( | |
| record, | |
| phase=phase, | |
| npc_id=npc_id, | |
| tick_from=tick_from, | |
| tick_to=tick_to, | |
| ): | |
| continue | |
| records.append(_normalize_log_record(line_no, record)) | |
| if len(records) >= limit: | |
| break | |
| return HTTPStatus.OK, { | |
| "run_id": run_dir.name, | |
| "ledger_path": str(ledger_path), | |
| "next_line": next_line, | |
| "records": records, | |
| } | |
| def admin_models(self) -> dict[str, Any]: | |
| return { | |
| "profiles": self._model_profiles(), | |
| "npcs": [ | |
| { | |
| "id": npc.id, | |
| "name": npc.name, | |
| "country_id": npc.country_id, | |
| "model_profile_id": npc.model_profile_id, | |
| "connector_id": npc.connector_id, | |
| } | |
| for npc in self._world.npcs | |
| ], | |
| } | |
| def admin_set_npc_model(self, npc_id: str, profile_id: str) -> tuple[HTTPStatus, dict[str, Any]]: | |
| profiles = {profile["id"]: profile for profile in self._model_profiles()} | |
| if profile_id not in profiles: | |
| return HTTPStatus.BAD_REQUEST, {"error": "unknown_profile", "message": "Unknown model profile."} | |
| profile = profiles[profile_id] | |
| connector_id = profile.get("connector_id") | |
| with self._lock: | |
| npc = next((candidate for candidate in self._world.npcs if candidate.id == npc_id), None) | |
| if npc is None: | |
| return HTTPStatus.NOT_FOUND, {"error": "npc_not_found", "message": "Unknown NPC."} | |
| npc.model_profile_id = profile_id | |
| npc.connector_id = connector_id if isinstance(connector_id, str) and connector_id else None | |
| self._world.event_log.append( | |
| WorldLogEvent( | |
| tick=self._world.tick, | |
| type="model_profile_changed", | |
| summary=f"{npc.name} model profile set to {profile_id}", | |
| severity="neutral", | |
| actor_id=npc.id, | |
| ) | |
| ) | |
| self._ledger.write_records( | |
| [ | |
| { | |
| "tick": self._world.tick, | |
| "phase": "admin_model_switch", | |
| "npc_id": npc_id, | |
| "model_profile_id": profile_id, | |
| "connector_id": connector_id, | |
| } | |
| ] | |
| ) | |
| return HTTPStatus.OK, {"npc_id": npc_id, "profile": profile} | |
| def _run_dir(self, run_id: str | None) -> Path | None: | |
| if not run_id or "/" in run_id or "\\" in run_id or not run_id.startswith("run_"): | |
| return None | |
| run_dir = self._ledger.run_dir.parent / run_id | |
| if not run_dir.is_dir(): | |
| return None | |
| return run_dir | |
| def _model_profiles(self) -> list[dict[str, Any]]: | |
| profiles: dict[str, dict[str, Any]] = { | |
| "default": { | |
| "id": "default", | |
| "label": "Default NPC model", | |
| "connector_id": None, | |
| "model": self._config.connector.model if self._config else None, | |
| } | |
| } | |
| if self._config is not None: | |
| for connector_id, connector in self._config.secondary_connectors.items(): | |
| profiles[connector_id] = { | |
| "id": connector_id, | |
| "label": connector_id, | |
| "connector_id": connector_id, | |
| "model": connector.model, | |
| "type": connector.type, | |
| } | |
| profiles_path = Path(os.getenv("MODEL_PROFILES_PATH", "config/model_profiles.json")) | |
| if profiles_path.is_file(): | |
| try: | |
| raw = json.loads(profiles_path.read_text(encoding="utf-8")) | |
| except json.JSONDecodeError: | |
| raw = {} | |
| for item in raw.get("profiles", []) if isinstance(raw, dict) else []: | |
| if isinstance(item, dict) and isinstance(item.get("id"), str): | |
| profiles[item["id"]] = {**profiles.get(item["id"], {}), **item} | |
| return list(profiles.values()) | |
| def _override_player_directives(self, world: WorldState, plan: TickPlan) -> TickPlan: | |
| """Replace planner directives for player-owned NPCs with their queued actions.""" | |
| player_ids = self._players.player_npc_ids | |
| if not player_ids: | |
| return plan | |
| directives = [ | |
| directive for directive in plan.directives if directive.npc_id not in player_ids | |
| ] | |
| for character in self._players.all_characters(): | |
| directive = self._players.take_directive(character, world) | |
| if directive is not None: | |
| directives.append(directive) | |
| return TickPlan( | |
| source=plan.source, | |
| directives=directives, | |
| overseer=plan.overseer, | |
| ledger_entries=plan.ledger_entries, | |
| ) | |
| def _begin_tick( | |
| self, | |
| ) -> tuple[WorldState | None, int | None, list[PendingCommand] | None, dict[str, Any] | None]: | |
| with self._lock: | |
| if self._tick_status.in_progress: | |
| return None, None, None, { | |
| "error": "tick_in_progress", | |
| "tick": self._world.tick, | |
| **tick_status_payload(self._tick_status), | |
| } | |
| next_tick = self._world.tick + 1 | |
| planning_world = deepcopy(self._world) | |
| pending_commands = list(self._pending_commands) | |
| self._pending_commands.clear() | |
| self._tick_status.in_progress = True | |
| self._tick_status.pending_tick = next_tick | |
| return planning_world, next_tick, pending_commands, None | |
| def create_game_runtime( | |
| *, | |
| world: WorldState, | |
| config: GameConfig, | |
| players: PlayerManager | None = None, | |
| snapshot_writer: BackgroundSnapshotWriter | None = None, | |
| snapshot_store: SnapshotStore | None = None, | |
| ) -> GameRuntime: | |
| simulator = create_world_simulator(config) | |
| return GameRuntime( | |
| world=world, | |
| simulator=simulator, | |
| modal_health_warmer=ModalHealthWarmer(modal_health_targets(config)), | |
| players=players, | |
| config=config, | |
| snapshot_writer=snapshot_writer, | |
| snapshot_store=snapshot_store, | |
| ) | |
| def _record_matches( | |
| record: dict[str, Any], | |
| *, | |
| phase: str | None, | |
| npc_id: str | None, | |
| tick_from: int | None, | |
| tick_to: int | None, | |
| ) -> bool: | |
| if phase and record.get("phase") != phase: | |
| return False | |
| if npc_id and record.get("npc_id") != npc_id: | |
| return False | |
| tick = record.get("tick") | |
| if tick_from is not None and isinstance(tick, int) and tick < tick_from: | |
| return False | |
| if tick_to is not None and isinstance(tick, int) and tick > tick_to: | |
| return False | |
| return True | |
| def _normalize_log_record(line: int, record: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "line": line, | |
| "tick": record.get("tick"), | |
| "phase": record.get("phase"), | |
| "npc_id": record.get("npc_id"), | |
| "model": _record_model(record), | |
| "summary": _record_summary(record), | |
| "record": record, | |
| } | |
| def _record_model(record: dict[str, Any]) -> str | None: | |
| model = record.get("model") | |
| if isinstance(model, str): | |
| return model | |
| request = record.get("request") | |
| if isinstance(request, dict) and isinstance(request.get("model"), str): | |
| return request["model"] | |
| return None | |
| def _record_summary(record: dict[str, Any]) -> str: | |
| summary = record.get("summary") | |
| if isinstance(summary, str) and summary: | |
| return summary | |
| phase = str(record.get("phase") or "record") | |
| parsed = record.get("parsed_action") or record.get("action") | |
| verdict = record.get("validator") or record.get("verdict") or record.get("reason") | |
| parts = [phase] | |
| if parsed: | |
| parts.append(f"action={parsed}") | |
| if verdict: | |
| parts.append(str(verdict)) | |
| return "; ".join(parts) | |
| def _character_summary(character: PlayerCharacter) -> dict[str, Any]: | |
| return { | |
| "token": character.token, | |
| "name": character.name, | |
| "icon": character.icon, | |
| "country_id": character.country_id, | |
| "npc_id": character.npc_id, | |
| } | |
| def _player_error_payload(exc: PlayerError) -> dict[str, Any]: | |
| return {"error": exc.code, "message": exc.message} | |
| def _player_error_status(exc: PlayerError) -> HTTPStatus: | |
| if exc.code == "invalid_token": | |
| return HTTPStatus.NOT_FOUND | |
| if exc.code == "character_dead": | |
| return HTTPStatus.GONE | |
| return HTTPStatus.BAD_REQUEST | |
| class TickStatus: | |
| in_progress: bool = False | |
| pending_tick: int | None = None | |
| class PendingCommand: | |
| kind: str | |
| raw_input: str | |
| def tick_status_payload(tick_status: TickStatus) -> dict[str, Any]: | |
| return { | |
| "tick_in_progress": tick_status.in_progress, | |
| "pending_tick": tick_status.pending_tick, | |
| } | |
| class ModalHealthWarmer: | |
| def __init__( | |
| self, | |
| targets: list[ModalHealthTarget], | |
| *, | |
| cooldown_seconds: float = 300, | |
| warmup_after_seconds: float = 2, | |
| warmup_retry_seconds: float = 5, | |
| opener: UrlOpener = urlopen, | |
| ) -> None: | |
| self._states = { | |
| target.id: ModalHealthState(target=target) | |
| for target in targets | |
| } | |
| self._cooldown_seconds = cooldown_seconds | |
| self._warmup_after_seconds = warmup_after_seconds | |
| self._warmup_retry_seconds = warmup_retry_seconds | |
| self._opener = opener | |
| self._lock = Lock() | |
| def trigger(self, *, force: bool = False) -> None: | |
| if not self._states: | |
| return | |
| now = time.monotonic() | |
| launches: list[tuple[str, str, int]] = [] | |
| with self._lock: | |
| for target_id, state in self._states.items(): | |
| if state.in_progress: | |
| continue | |
| if not force and now - state.last_started_at < self._cooldown_seconds: | |
| continue | |
| state.in_progress = True | |
| state.status = "checking" | |
| state.http_status = None | |
| state.last_started_at = now | |
| state.generation += 1 | |
| launches.append((target_id, state.target.health_url, state.generation)) | |
| for target_id, health_url, generation in launches: | |
| Thread( | |
| target=self._mark_warmup_if_pending, | |
| args=(target_id, generation), | |
| name="modal-health-warmup-status", | |
| daemon=True, | |
| ).start() | |
| Thread( | |
| target=self._run_one, | |
| args=(target_id, health_url, generation), | |
| name="modal-health-warmer", | |
| daemon=True, | |
| ).start() | |
| def statuses(self) -> list[dict[str, Any]]: | |
| with self._lock: | |
| return [ | |
| { | |
| "id": state.target.id, | |
| "label": state.target.label, | |
| "model": state.target.model, | |
| "status": state.status, | |
| "http_status": state.http_status, | |
| } | |
| for state in self._states.values() | |
| ] | |
| def _mark_warmup_if_pending(self, target_id: str, generation: int) -> None: | |
| time.sleep(self._warmup_after_seconds) | |
| with self._lock: | |
| state = self._states.get(target_id) | |
| if ( | |
| state is not None | |
| and state.generation == generation | |
| and state.in_progress | |
| and state.status == "checking" | |
| ): | |
| state.status = "warmup" | |
| def _run_one(self, target_id: str, health_url: str, generation: int) -> None: | |
| headers = modal_proxy_auth_headers() | |
| while True: | |
| try: | |
| request = Request(health_url, headers=headers) if headers else health_url | |
| response = self._opener(request) | |
| try: | |
| http_status = int(getattr(response, "status", 0)) | |
| finally: | |
| response.close() | |
| except HTTPError as exc: | |
| http_status = int(exc.code) | |
| if http_status != HTTPStatus.SERVICE_UNAVAILABLE: | |
| self._set_status(target_id, generation, "error", http_status) | |
| print(f"Modal health check failed for {health_url}: {exc}", flush=True) | |
| return | |
| except (TimeoutError, URLError, OSError) as exc: | |
| self._set_status(target_id, generation, "error", None) | |
| print(f"Modal health check failed for {health_url}: {exc}", flush=True) | |
| return | |
| if http_status == HTTPStatus.OK: | |
| self._set_status(target_id, generation, "ready", http_status) | |
| print(f"Modal health check warmed {health_url}: {http_status}", flush=True) | |
| return | |
| if http_status == HTTPStatus.SERVICE_UNAVAILABLE: | |
| if not self._set_status( | |
| target_id, | |
| generation, | |
| "warmup", | |
| http_status, | |
| in_progress=True, | |
| ): | |
| return | |
| print(f"Modal health check warming {health_url}: {http_status}", flush=True) | |
| time.sleep(self._warmup_retry_seconds) | |
| continue | |
| self._set_status(target_id, generation, "error", http_status) | |
| print(f"Modal health check failed for {health_url}: {http_status}", flush=True) | |
| return | |
| def _set_status( | |
| self, | |
| target_id: str, | |
| generation: int, | |
| status: str, | |
| http_status: int | None, | |
| *, | |
| in_progress: bool = False, | |
| ) -> bool: | |
| with self._lock: | |
| state = self._states.get(target_id) | |
| if state is None or state.generation != generation: | |
| return False | |
| state.status = status | |
| state.http_status = http_status | |
| state.in_progress = in_progress | |
| return True | |
| class ModalHealthTarget: | |
| id: str | |
| label: str | |
| model: str | None | |
| health_url: str | |
| class ModalHealthState: | |
| target: ModalHealthTarget | |
| status: str = "unknown" | |
| http_status: int | None = None | |
| last_started_at: float = 0.0 | |
| generation: int = 0 | |
| in_progress: bool = False | |
| class UrlResponse(Protocol): | |
| status: int | |
| def close(self) -> None: ... | |
| class UrlOpener(Protocol): | |
| def __call__(self, url: str | Request) -> UrlResponse: ... | |
| class HealthWarmer(Protocol): | |
| def trigger(self, *, force: bool = False) -> None: ... | |
| def statuses(self) -> list[dict[str, Any]]: ... | |
| def modal_health_urls(config: GameConfig) -> list[str]: | |
| return [target.health_url for target in modal_health_targets(config)] | |
| def modal_health_targets(config: GameConfig) -> list[ModalHealthTarget]: | |
| targets: list[ModalHealthTarget] = [] | |
| connector_targets: list[tuple[str, str, ConnectorConfig | None]] = [ | |
| ("npc_model", "Nemotron NPC", config.connector), | |
| ] | |
| for connector_id, connector in config.secondary_connectors.items(): | |
| connector_targets.append( | |
| (f"npc_{connector_id}", f"{connector_id.capitalize()} NPC", connector) | |
| ) | |
| for target_id, label, connector in connector_targets: | |
| if connector is None: | |
| continue | |
| health_url = modal_health_url(connector) | |
| if health_url is not None: | |
| targets.append( | |
| ModalHealthTarget( | |
| id=target_id, | |
| label=label, | |
| model=connector.model, | |
| health_url=health_url, | |
| ) | |
| ) | |
| return targets | |
| def modal_health_url(connector: ConnectorConfig) -> str | None: | |
| if connector.type != "openai_compatible": | |
| return None | |
| root = connector.base_url or connector.api_url | |
| if not root: | |
| return None | |
| root = root.strip().rstrip("/") | |
| for suffix in ("/v1/chat/completions", "/chat/completions", "/v1"): | |
| if root.endswith(suffix): | |
| root = root.removesuffix(suffix) | |
| if ".modal.run" not in root: | |
| return None | |
| return f"{root}/health" | |