| from __future__ import annotations |
|
|
| import json |
| import os |
| import time |
| from copy import deepcopy |
| from dataclasses import dataclass |
| from http import HTTPStatus |
| from pathlib import Path |
| from threading import Lock, Thread |
| from typing import Any, Protocol |
| from urllib.error import HTTPError, URLError |
| from urllib.request import Request, urlopen |
|
|
| from world_simulator.api.modal_auth import modal_proxy_auth_headers |
| from world_simulator.api.persistence import ( |
| BackgroundSnapshotWriter, |
| SnapshotStore, |
| build_snapshot, |
| ) |
| from world_simulator.api.players import ( |
| PlayerCharacter, |
| PlayerError, |
| PlayerManager, |
| ) |
| from world_simulator.config import ConnectorConfig, GameConfig |
| from world_simulator.domain import WorldLogEvent, WorldState |
| from world_simulator.observability import RunLedger, append_record, set_active_tick_buffer |
| from world_simulator.rendering.scene_contract import to_scene_snapshot |
| from world_simulator.simulation.connectors.base import TickPlan, WorldSimulator |
| from world_simulator.simulation.connectors.factory import create_world_simulator |
| from world_simulator.simulation.directives import expire_directives |
| from world_simulator.simulation.spawning import create_world |
| from world_simulator.simulation.tick import apply_tick_plan, plan_world_tick |
|
|
|
|
| class GameRuntime: |
| def __init__( |
| self, |
| *, |
| world: WorldState, |
| simulator: WorldSimulator, |
| modal_health_warmer: HealthWarmer | None = None, |
| players: PlayerManager | None = None, |
| ledger: RunLedger | None = None, |
| config: GameConfig | None = None, |
| snapshot_writer: BackgroundSnapshotWriter | None = None, |
| snapshot_store: SnapshotStore | None = None, |
| ) -> None: |
| self._world = world |
| self._simulator = simulator |
| self._modal_health_warmer = modal_health_warmer |
| self._players = players or PlayerManager() |
| self._ledger = ledger or RunLedger() |
| self._config = config |
| self._snapshot_writer = snapshot_writer |
| self._snapshot_store = snapshot_store |
| self._lock = Lock() |
| self._tick_status = TickStatus() |
| self._pending_commands: list[PendingCommand] = [] |
|
|
| @property |
| def simulator_name(self) -> str: |
| return self._simulator.name |
|
|
| def health(self) -> dict[str, Any]: |
| with self._lock: |
| return { |
| "status": "ok", |
| "tick": self._world.tick, |
| "simulator": self._simulator.name, |
| "last_tick_source": self._world.last_tick_source, |
| **tick_status_payload(self._tick_status), |
| } |
|
|
| def state(self) -> dict[str, Any]: |
| with self._lock: |
| payload = self._world.to_dict() |
| payload["simulation"] = tick_status_payload(self._tick_status) |
| return payload |
|
|
| def scene_state(self, *, warmup: bool = False) -> dict[str, Any]: |
| if warmup and self._modal_health_warmer is not None: |
| self._modal_health_warmer.trigger(force=True) |
|
|
| with self._lock: |
| payload = to_scene_snapshot( |
| self._world, |
| tick_in_progress=self._tick_status.in_progress, |
| pending_tick=self._tick_status.pending_tick, |
| ) |
| if self._modal_health_warmer is not None: |
| payload["simulation"]["models"] = self._modal_health_warmer.statuses() |
| return payload |
|
|
| def tick(self) -> tuple[HTTPStatus, dict[str, Any]]: |
| planning_world, next_tick, pending_commands, conflict = self._begin_tick() |
| if conflict is not None: |
| return HTTPStatus.CONFLICT, conflict |
| assert planning_world is not None |
| assert next_tick is not None |
| assert pending_commands is not None |
|
|
| buffer = self._ledger.start_tick( |
| next_tick, |
| npc_order=[npc.id for npc in planning_world.living_npcs()], |
| ) |
| set_active_tick_buffer(buffer) |
|
|
| try: |
| expire_directives(planning_world, tick=next_tick) |
| planned_tick, plan = plan_world_tick( |
| planning_world, |
| self._simulator, |
| next_tick, |
| ) |
| except Exception as exc: |
| with self._lock: |
| self._tick_status.in_progress = False |
| self._tick_status.pending_tick = None |
| self._pending_commands = [*pending_commands, *self._pending_commands] |
| set_active_tick_buffer(None) |
| return HTTPStatus.INTERNAL_SERVER_ERROR, { |
| "error": "tick_failed", |
| "message": str(exc), |
| } |
|
|
| try: |
| with self._lock: |
| before_events = len(self._world.event_log) |
| plan = self._override_player_directives(planning_world, plan) |
| apply_tick_plan(self._world, planned_tick, plan) |
| append_record( |
| { |
| "tick": planned_tick, |
| "phase": "engine_events", |
| "events": [ |
| event |
| for event in self._world.to_dict()["event_log"][before_events:] |
| ], |
| } |
| ) |
| self._tick_status.in_progress = False |
| self._tick_status.pending_tick = None |
| payload = self._world.to_dict() |
| payload["simulation"] = tick_status_payload(self._tick_status) |
| self._persist_snapshot() |
| self._ledger.flush_tick(buffer) |
| return HTTPStatus.OK, payload |
| except Exception as exc: |
| |
| |
| |
| |
| with self._lock: |
| self._tick_status.in_progress = False |
| self._tick_status.pending_tick = None |
| return HTTPStatus.INTERNAL_SERVER_ERROR, { |
| "error": "tick_failed", |
| "message": str(exc), |
| } |
| finally: |
| set_active_tick_buffer(None) |
|
|
| def _persist_snapshot(self) -> None: |
| """Submit a full world+players snapshot to the background writer. |
| |
| Called while holding ``self._lock`` so the captured state is consistent; |
| the actual (slow, bucket) write happens off-lock on the writer thread. |
| ``build_snapshot`` makes its own ``to_dict`` copy, so the writer never |
| races with the ``payload`` dict returned to the caller. |
| """ |
| if self._snapshot_writer is None: |
| return |
| self._snapshot_writer.submit(build_snapshot(self._world, self._players.snapshot())) |
|
|
| def reset_world(self) -> tuple[HTTPStatus, dict[str, Any]]: |
| """Wipe the live world: fresh world, no players, deleted snapshot files. |
| |
| Used by the Admin panel. Drops any in-flight snapshot first so the |
| background writer cannot resurrect the deleted state, then clears the |
| store on disk. |
| """ |
| if self._config is None: |
| return HTTPStatus.INTERNAL_SERVER_ERROR, { |
| "error": "reset_unavailable", |
| "message": "Runtime has no config to rebuild the world from.", |
| } |
| with self._lock: |
| if self._snapshot_writer is not None: |
| self._snapshot_writer.drop_pending() |
| self._world = create_world(self._config) |
| self._players = PlayerManager() |
| self._tick_status = TickStatus() |
| self._pending_commands = [] |
| if self._snapshot_store is not None: |
| self._snapshot_store.clear() |
| return HTTPStatus.OK, {"ok": True, "tick": self._world.tick} |
|
|
| |
| def create_player( |
| self, |
| *, |
| name: str, |
| icon: str, |
| country_id: str = "nemotron", |
| ) -> tuple[HTTPStatus, dict[str, Any]]: |
| with self._lock: |
| try: |
| character = self._players.create( |
| name=name, |
| icon=icon, |
| country_id=country_id, |
| created_tick=self._world.tick, |
| ) |
| except PlayerError as exc: |
| return HTTPStatus.BAD_REQUEST, _player_error_payload(exc) |
| return HTTPStatus.OK, _character_summary(character) |
|
|
| def player_status(self, token: str) -> tuple[HTTPStatus, dict[str, Any]]: |
| with self._lock: |
| try: |
| character = self._players.get(token) |
| return HTTPStatus.OK, self._players.character_status(character, self._world) |
| except PlayerError as exc: |
| return HTTPStatus.NOT_FOUND, _player_error_payload(exc) |
|
|
| def player_observe(self, token: str) -> tuple[HTTPStatus, dict[str, Any]]: |
| with self._lock: |
| try: |
| character = self._players.get(token) |
| return HTTPStatus.OK, self._players.observation(character, self._world) |
| except PlayerError as exc: |
| return _player_error_status(exc), _player_error_payload(exc) |
|
|
| def player_act( |
| self, |
| token: str, |
| payload: dict[str, Any], |
| ) -> tuple[HTTPStatus, dict[str, Any]]: |
| with self._lock: |
| try: |
| character = self._players.get(token) |
| self._players.queue_action(character, payload, self._world) |
| observation = self._players.observation(character, self._world) |
| except PlayerError as exc: |
| return _player_error_status(exc), _player_error_payload(exc) |
| return HTTPStatus.OK, { |
| "queued_action": character.pending_action, |
| "message": ( |
| f"Action '{character.pending_action['action']}' queued for the next tick." |
| if character.pending_action |
| else "Action queued." |
| ), |
| "observation": observation, |
| } |
|
|
| def player_vote_available(self, token: str) -> bool: |
| with self._lock: |
| try: |
| character = self._players.get(token) |
| except PlayerError: |
| return False |
| for election in self._world.elections: |
| if election.country_id != character.country_id or election.completed: |
| continue |
| if election.start_tick <= self._world.tick <= election.end_tick: |
| return True |
| return False |
|
|
| |
| def admin_logs_runs(self) -> dict[str, Any]: |
| root = self._ledger.run_dir.parent |
| runs: list[dict[str, Any]] = [] |
| for run_dir in sorted(root.glob("run_*"), key=lambda item: item.name, reverse=True): |
| if not run_dir.is_dir(): |
| continue |
| ledger_path = run_dir / "ledger.jsonl" |
| runs.append( |
| { |
| "run_id": run_dir.name, |
| "current": run_dir == self._ledger.run_dir, |
| "ledger_path": str(ledger_path), |
| "exists": ledger_path.is_file(), |
| "size_bytes": ledger_path.stat().st_size if ledger_path.is_file() else 0, |
| } |
| ) |
| return {"runs": runs} |
|
|
| def admin_log_records( |
| self, |
| *, |
| run_id: str | None = None, |
| since_line: int = 0, |
| limit: int = 200, |
| phase: str | None = None, |
| npc_id: str | None = None, |
| tick_from: int | None = None, |
| tick_to: int | None = None, |
| ) -> tuple[HTTPStatus, dict[str, Any]]: |
| run_dir = self._ledger.run_dir if run_id in (None, "current") else self._run_dir(run_id) |
| if run_dir is None: |
| return HTTPStatus.NOT_FOUND, {"error": "run_not_found", "message": "Unknown ledger run."} |
| ledger_path = run_dir / "ledger.jsonl" |
| if not ledger_path.is_file(): |
| return HTTPStatus.NOT_FOUND, {"error": "ledger_not_found", "message": "Run has no ledger.jsonl."} |
|
|
| records: list[dict[str, Any]] = [] |
| next_line = max(0, since_line) |
| limit = max(1, min(limit, 1000)) |
| with ledger_path.open("r", encoding="utf-8") as ledger_file: |
| for line_no, line in enumerate(ledger_file): |
| next_line = line_no + 1 |
| if line_no < since_line: |
| continue |
| try: |
| record = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if not _record_matches( |
| record, |
| phase=phase, |
| npc_id=npc_id, |
| tick_from=tick_from, |
| tick_to=tick_to, |
| ): |
| continue |
| records.append(_normalize_log_record(line_no, record)) |
| if len(records) >= limit: |
| break |
| return HTTPStatus.OK, { |
| "run_id": run_dir.name, |
| "ledger_path": str(ledger_path), |
| "next_line": next_line, |
| "records": records, |
| } |
|
|
| def admin_models(self) -> dict[str, Any]: |
| return { |
| "profiles": self._model_profiles(), |
| "npcs": [ |
| { |
| "id": npc.id, |
| "name": npc.name, |
| "country_id": npc.country_id, |
| "model_profile_id": npc.model_profile_id, |
| "connector_id": npc.connector_id, |
| } |
| for npc in self._world.npcs |
| ], |
| } |
|
|
| def admin_set_npc_model(self, npc_id: str, profile_id: str) -> tuple[HTTPStatus, dict[str, Any]]: |
| profiles = {profile["id"]: profile for profile in self._model_profiles()} |
| if profile_id not in profiles: |
| return HTTPStatus.BAD_REQUEST, {"error": "unknown_profile", "message": "Unknown model profile."} |
| profile = profiles[profile_id] |
| connector_id = profile.get("connector_id") |
| with self._lock: |
| npc = next((candidate for candidate in self._world.npcs if candidate.id == npc_id), None) |
| if npc is None: |
| return HTTPStatus.NOT_FOUND, {"error": "npc_not_found", "message": "Unknown NPC."} |
| npc.model_profile_id = profile_id |
| npc.connector_id = connector_id if isinstance(connector_id, str) and connector_id else None |
| self._world.event_log.append( |
| WorldLogEvent( |
| tick=self._world.tick, |
| type="model_profile_changed", |
| summary=f"{npc.name} model profile set to {profile_id}", |
| severity="neutral", |
| actor_id=npc.id, |
| ) |
| ) |
| self._ledger.write_records( |
| [ |
| { |
| "tick": self._world.tick, |
| "phase": "admin_model_switch", |
| "npc_id": npc_id, |
| "model_profile_id": profile_id, |
| "connector_id": connector_id, |
| } |
| ] |
| ) |
| return HTTPStatus.OK, {"npc_id": npc_id, "profile": profile} |
|
|
| def _run_dir(self, run_id: str | None) -> Path | None: |
| if not run_id or "/" in run_id or "\\" in run_id or not run_id.startswith("run_"): |
| return None |
| run_dir = self._ledger.run_dir.parent / run_id |
| if not run_dir.is_dir(): |
| return None |
| return run_dir |
|
|
| def _model_profiles(self) -> list[dict[str, Any]]: |
| profiles: dict[str, dict[str, Any]] = { |
| "default": { |
| "id": "default", |
| "label": "Default NPC model", |
| "connector_id": None, |
| "model": self._config.connector.model if self._config else None, |
| } |
| } |
| if self._config is not None: |
| for connector_id, connector in self._config.secondary_connectors.items(): |
| profiles[connector_id] = { |
| "id": connector_id, |
| "label": connector_id, |
| "connector_id": connector_id, |
| "model": connector.model, |
| "type": connector.type, |
| } |
| profiles_path = Path(os.getenv("MODEL_PROFILES_PATH", "config/model_profiles.json")) |
| if profiles_path.is_file(): |
| try: |
| raw = json.loads(profiles_path.read_text(encoding="utf-8")) |
| except json.JSONDecodeError: |
| raw = {} |
| for item in raw.get("profiles", []) if isinstance(raw, dict) else []: |
| if isinstance(item, dict) and isinstance(item.get("id"), str): |
| profiles[item["id"]] = {**profiles.get(item["id"], {}), **item} |
| return list(profiles.values()) |
|
|
| def _override_player_directives(self, world: WorldState, plan: TickPlan) -> TickPlan: |
| """Replace planner directives for player-owned NPCs with their queued actions.""" |
| player_ids = self._players.player_npc_ids |
| if not player_ids: |
| return plan |
| directives = [ |
| directive for directive in plan.directives if directive.npc_id not in player_ids |
| ] |
| for character in self._players.all_characters(): |
| directive = self._players.take_directive(character, world) |
| if directive is not None: |
| directives.append(directive) |
| return TickPlan( |
| source=plan.source, |
| directives=directives, |
| overseer=plan.overseer, |
| ledger_entries=plan.ledger_entries, |
| ) |
|
|
| def _begin_tick( |
| self, |
| ) -> tuple[WorldState | None, int | None, list[PendingCommand] | None, dict[str, Any] | None]: |
| with self._lock: |
| if self._tick_status.in_progress: |
| return None, None, None, { |
| "error": "tick_in_progress", |
| "tick": self._world.tick, |
| **tick_status_payload(self._tick_status), |
| } |
| next_tick = self._world.tick + 1 |
| planning_world = deepcopy(self._world) |
| pending_commands = list(self._pending_commands) |
| self._pending_commands.clear() |
| self._tick_status.in_progress = True |
| self._tick_status.pending_tick = next_tick |
| return planning_world, next_tick, pending_commands, None |
|
|
|
|
| def create_game_runtime( |
| *, |
| world: WorldState, |
| config: GameConfig, |
| players: PlayerManager | None = None, |
| snapshot_writer: BackgroundSnapshotWriter | None = None, |
| snapshot_store: SnapshotStore | None = None, |
| ) -> GameRuntime: |
| simulator = create_world_simulator(config) |
| return GameRuntime( |
| world=world, |
| simulator=simulator, |
| modal_health_warmer=ModalHealthWarmer(modal_health_targets(config)), |
| players=players, |
| config=config, |
| snapshot_writer=snapshot_writer, |
| snapshot_store=snapshot_store, |
| ) |
|
|
|
|
| def _record_matches( |
| record: dict[str, Any], |
| *, |
| phase: str | None, |
| npc_id: str | None, |
| tick_from: int | None, |
| tick_to: int | None, |
| ) -> bool: |
| if phase and record.get("phase") != phase: |
| return False |
| if npc_id and record.get("npc_id") != npc_id: |
| return False |
| tick = record.get("tick") |
| if tick_from is not None and isinstance(tick, int) and tick < tick_from: |
| return False |
| if tick_to is not None and isinstance(tick, int) and tick > tick_to: |
| return False |
| return True |
|
|
|
|
| def _normalize_log_record(line: int, record: dict[str, Any]) -> dict[str, Any]: |
| return { |
| "line": line, |
| "tick": record.get("tick"), |
| "phase": record.get("phase"), |
| "npc_id": record.get("npc_id"), |
| "model": _record_model(record), |
| "summary": _record_summary(record), |
| "record": record, |
| } |
|
|
|
|
| def _record_model(record: dict[str, Any]) -> str | None: |
| model = record.get("model") |
| if isinstance(model, str): |
| return model |
| request = record.get("request") |
| if isinstance(request, dict) and isinstance(request.get("model"), str): |
| return request["model"] |
| return None |
|
|
|
|
| def _record_summary(record: dict[str, Any]) -> str: |
| summary = record.get("summary") |
| if isinstance(summary, str) and summary: |
| return summary |
| phase = str(record.get("phase") or "record") |
| parsed = record.get("parsed_action") or record.get("action") |
| verdict = record.get("validator") or record.get("verdict") or record.get("reason") |
| parts = [phase] |
| if parsed: |
| parts.append(f"action={parsed}") |
| if verdict: |
| parts.append(str(verdict)) |
| return "; ".join(parts) |
|
|
|
|
| def _character_summary(character: PlayerCharacter) -> dict[str, Any]: |
| return { |
| "token": character.token, |
| "name": character.name, |
| "icon": character.icon, |
| "country_id": character.country_id, |
| "npc_id": character.npc_id, |
| } |
|
|
|
|
| def _player_error_payload(exc: PlayerError) -> dict[str, Any]: |
| return {"error": exc.code, "message": exc.message} |
|
|
|
|
| def _player_error_status(exc: PlayerError) -> HTTPStatus: |
| if exc.code == "invalid_token": |
| return HTTPStatus.NOT_FOUND |
| if exc.code == "character_dead": |
| return HTTPStatus.GONE |
| return HTTPStatus.BAD_REQUEST |
|
|
|
|
| @dataclass(slots=True) |
| class TickStatus: |
| in_progress: bool = False |
| pending_tick: int | None = None |
|
|
|
|
| @dataclass(frozen=True, slots=True) |
| class PendingCommand: |
| kind: str |
| raw_input: str |
|
|
|
|
| def tick_status_payload(tick_status: TickStatus) -> dict[str, Any]: |
| return { |
| "tick_in_progress": tick_status.in_progress, |
| "pending_tick": tick_status.pending_tick, |
| } |
|
|
|
|
| class ModalHealthWarmer: |
| def __init__( |
| self, |
| targets: list[ModalHealthTarget], |
| *, |
| cooldown_seconds: float = 300, |
| warmup_after_seconds: float = 2, |
| warmup_retry_seconds: float = 5, |
| opener: UrlOpener = urlopen, |
| ) -> None: |
| self._states = { |
| target.id: ModalHealthState(target=target) |
| for target in targets |
| } |
| self._cooldown_seconds = cooldown_seconds |
| self._warmup_after_seconds = warmup_after_seconds |
| self._warmup_retry_seconds = warmup_retry_seconds |
| self._opener = opener |
| self._lock = Lock() |
|
|
| def trigger(self, *, force: bool = False) -> None: |
| if not self._states: |
| return |
|
|
| now = time.monotonic() |
| launches: list[tuple[str, str, int]] = [] |
|
|
| with self._lock: |
| for target_id, state in self._states.items(): |
| if state.in_progress: |
| continue |
| if not force and now - state.last_started_at < self._cooldown_seconds: |
| continue |
| state.in_progress = True |
| state.status = "checking" |
| state.http_status = None |
| state.last_started_at = now |
| state.generation += 1 |
| launches.append((target_id, state.target.health_url, state.generation)) |
|
|
| for target_id, health_url, generation in launches: |
| Thread( |
| target=self._mark_warmup_if_pending, |
| args=(target_id, generation), |
| name="modal-health-warmup-status", |
| daemon=True, |
| ).start() |
| Thread( |
| target=self._run_one, |
| args=(target_id, health_url, generation), |
| name="modal-health-warmer", |
| daemon=True, |
| ).start() |
|
|
| def statuses(self) -> list[dict[str, Any]]: |
| with self._lock: |
| return [ |
| { |
| "id": state.target.id, |
| "label": state.target.label, |
| "model": state.target.model, |
| "status": state.status, |
| "http_status": state.http_status, |
| } |
| for state in self._states.values() |
| ] |
|
|
| def _mark_warmup_if_pending(self, target_id: str, generation: int) -> None: |
| time.sleep(self._warmup_after_seconds) |
| with self._lock: |
| state = self._states.get(target_id) |
| if ( |
| state is not None |
| and state.generation == generation |
| and state.in_progress |
| and state.status == "checking" |
| ): |
| state.status = "warmup" |
|
|
| def _run_one(self, target_id: str, health_url: str, generation: int) -> None: |
| headers = modal_proxy_auth_headers() |
| while True: |
| try: |
| request = Request(health_url, headers=headers) if headers else health_url |
| response = self._opener(request) |
| try: |
| http_status = int(getattr(response, "status", 0)) |
| finally: |
| response.close() |
| except HTTPError as exc: |
| http_status = int(exc.code) |
| if http_status != HTTPStatus.SERVICE_UNAVAILABLE: |
| self._set_status(target_id, generation, "error", http_status) |
| print(f"Modal health check failed for {health_url}: {exc}", flush=True) |
| return |
| except (TimeoutError, URLError, OSError) as exc: |
| self._set_status(target_id, generation, "error", None) |
| print(f"Modal health check failed for {health_url}: {exc}", flush=True) |
| return |
|
|
| if http_status == HTTPStatus.OK: |
| self._set_status(target_id, generation, "ready", http_status) |
| print(f"Modal health check warmed {health_url}: {http_status}", flush=True) |
| return |
|
|
| if http_status == HTTPStatus.SERVICE_UNAVAILABLE: |
| if not self._set_status( |
| target_id, |
| generation, |
| "warmup", |
| http_status, |
| in_progress=True, |
| ): |
| return |
| print(f"Modal health check warming {health_url}: {http_status}", flush=True) |
| time.sleep(self._warmup_retry_seconds) |
| continue |
|
|
| self._set_status(target_id, generation, "error", http_status) |
| print(f"Modal health check failed for {health_url}: {http_status}", flush=True) |
| return |
|
|
| def _set_status( |
| self, |
| target_id: str, |
| generation: int, |
| status: str, |
| http_status: int | None, |
| *, |
| in_progress: bool = False, |
| ) -> bool: |
| with self._lock: |
| state = self._states.get(target_id) |
| if state is None or state.generation != generation: |
| return False |
| state.status = status |
| state.http_status = http_status |
| state.in_progress = in_progress |
| return True |
|
|
|
|
| @dataclass(frozen=True, slots=True) |
| class ModalHealthTarget: |
| id: str |
| label: str |
| model: str | None |
| health_url: str |
|
|
|
|
| @dataclass(slots=True) |
| class ModalHealthState: |
| target: ModalHealthTarget |
| status: str = "unknown" |
| http_status: int | None = None |
| last_started_at: float = 0.0 |
| generation: int = 0 |
| in_progress: bool = False |
|
|
|
|
| class UrlResponse(Protocol): |
| status: int |
|
|
| def close(self) -> None: ... |
|
|
|
|
| class UrlOpener(Protocol): |
| def __call__(self, url: str | Request) -> UrlResponse: ... |
|
|
|
|
| class HealthWarmer(Protocol): |
| def trigger(self, *, force: bool = False) -> None: ... |
|
|
| def statuses(self) -> list[dict[str, Any]]: ... |
|
|
|
|
| def modal_health_urls(config: GameConfig) -> list[str]: |
| return [target.health_url for target in modal_health_targets(config)] |
|
|
|
|
| def modal_health_targets(config: GameConfig) -> list[ModalHealthTarget]: |
| targets: list[ModalHealthTarget] = [] |
| connector_targets: list[tuple[str, str, ConnectorConfig | None]] = [ |
| ("npc_model", "Nemotron NPC", config.connector), |
| ] |
| for connector_id, connector in config.secondary_connectors.items(): |
| connector_targets.append( |
| (f"npc_{connector_id}", f"{connector_id.capitalize()} NPC", connector) |
| ) |
| for target_id, label, connector in connector_targets: |
| if connector is None: |
| continue |
| health_url = modal_health_url(connector) |
| if health_url is not None: |
| targets.append( |
| ModalHealthTarget( |
| id=target_id, |
| label=label, |
| model=connector.model, |
| health_url=health_url, |
| ) |
| ) |
| return targets |
|
|
|
|
| def modal_health_url(connector: ConnectorConfig) -> str | None: |
| if connector.type != "openai_compatible": |
| return None |
|
|
| root = connector.base_url or connector.api_url |
| if not root: |
| return None |
|
|
| root = root.strip().rstrip("/") |
| for suffix in ("/v1/chat/completions", "/chat/completions", "/v1"): |
| if root.endswith(suffix): |
| root = root.removesuffix(suffix) |
|
|
| if ".modal.run" not in root: |
| return None |
| return f"{root}/health" |
|
|