Spaces:
Runtime error
Runtime error
| """External-agent player characters and their lifecycle. | |
| A player is a human who connects their own agent over MCP. They create a | |
| character in the public UI, receive a token, and then drive that character | |
| through MCP tool calls. The character only enters the world (``world.npcs``) | |
| after the first MCP interaction, and once it dies the player must create a new | |
| one. | |
| This module is intentionally free of locking: every method that reads or | |
| mutates world state is called by :class:`GameRuntime` while it already holds | |
| its own lock, which serialises access against the tick loop. | |
| """ | |
| from __future__ import annotations | |
| import secrets | |
| from dataclasses import asdict, dataclass, fields | |
| from typing import Any | |
| from world_simulator.domain import MemoryEntry, Npc, Vec3, WorldState | |
| from world_simulator.simulation.connectors.base import ActionKind, NpcDirective | |
| from world_simulator.simulation.mechanics import BLOCK_SIZE, is_alive, vec_distance | |
| from world_simulator.simulation.survival import ( | |
| build_perception, | |
| is_survival_world, | |
| select_goal, | |
| ) | |
| # Internal engine role for a player NPC. Players are deliberately not bound to a | |
| # faction/role yet (a richer faction primitive comes later) and are NOT | |
| # role-gated, so this only affects cosmetics like the default rendering. | |
| PLAYER_ROLE = "gatherer" | |
| # Player NPCs are not role-gated (see survival.validate_survival_action): an | |
| # external agent may request any of these actions. The survival resolver still | |
| # repairs physically impossible requests (no target, empty inventory, | |
| # out-of-range) but never blocks an action by role. | |
| PLAYER_ACTIONS: tuple[str, ...] = ("move", "speak", "attack", "use", "transfer") | |
| VOTE_ACTION = "vote" | |
| class PlayerError(Exception): | |
| """Raised for player-facing failures (unknown token, dead character, ...).""" | |
| def __init__(self, code: str, message: str) -> None: | |
| super().__init__(message) | |
| self.code = code | |
| self.message = message | |
| class PlayerCharacter: | |
| token: str | |
| name: str | |
| icon: str | |
| country_id: str | |
| npc_id: str | |
| created_tick: int | |
| spawned: bool = False | |
| death_cause: str | None = None | |
| pending_action: dict[str, Any] | None = None | |
| last_result: str | None = None | |
| def display_name(self) -> str: | |
| icon = self.icon.strip() | |
| return f"{icon} {self.name}".strip() if icon else self.name | |
| class PlayerManager: | |
| """Registry of player characters keyed by their MCP token.""" | |
| def __init__(self) -> None: | |
| self._by_token: dict[str, PlayerCharacter] = {} | |
| self._counter = 0 | |
| # -- registry ---------------------------------------------------------- # | |
| def create( | |
| self, | |
| *, | |
| name: str, | |
| icon: str, | |
| country_id: str = "nemotron", | |
| created_tick: int, | |
| ) -> PlayerCharacter: | |
| clean_name = (name or "").strip() | |
| if not clean_name: | |
| raise PlayerError("invalid_name", "Character name must not be empty.") | |
| if len(clean_name) > 24: | |
| clean_name = clean_name[:24] | |
| self._counter += 1 | |
| token = secrets.token_urlsafe(16) | |
| character = PlayerCharacter( | |
| token=token, | |
| name=clean_name, | |
| icon=(icon or "").strip()[:4], | |
| country_id=country_id if country_id in {"nemotron", "qwen"} else "nemotron", | |
| npc_id=f"player-{self._counter:03d}-{token[:6]}", | |
| created_tick=created_tick, | |
| ) | |
| self._by_token[token] = character | |
| return character | |
| def get(self, token: str) -> PlayerCharacter: | |
| character = self._by_token.get((token or "").strip()) | |
| if character is None: | |
| raise PlayerError("invalid_token", "Unknown character token.") | |
| return character | |
| def all_characters(self) -> list[PlayerCharacter]: | |
| return list(self._by_token.values()) | |
| def player_npc_ids(self) -> set[str]: | |
| return {character.npc_id for character in self._by_token.values()} | |
| # -- persistence (called under the runtime lock) ----------------------- # | |
| def snapshot(self) -> dict[str, Any]: | |
| return { | |
| "counter": self._counter, | |
| "characters": [asdict(character) for character in self._by_token.values()], | |
| } | |
| def restore(self, state: dict[str, Any]) -> None: | |
| known = {field.name for field in fields(PlayerCharacter)} | |
| self._counter = int(state.get("counter", 0)) | |
| self._by_token = {} | |
| for raw in state.get("characters", []): | |
| if not isinstance(raw, dict) or "token" not in raw: | |
| continue | |
| character = PlayerCharacter(**{key: raw[key] for key in known if key in raw}) | |
| self._by_token[character.token] = character | |
| # -- world lifecycle (called under the runtime lock) ------------------- # | |
| def ensure_spawned(self, character: PlayerCharacter, world: WorldState) -> Npc: | |
| """Spawn the character into the world on first interaction. | |
| Raises :class:`PlayerError` if the character has already died. | |
| """ | |
| existing = self._find_npc(character, world) | |
| if existing is not None: | |
| if not is_alive(existing): | |
| self._mark_dead(character, world, existing) | |
| raise self._dead_error(character) | |
| return existing | |
| if character.spawned: | |
| # It was spawned before but the NPC is gone -> treat as dead. | |
| self._mark_dead(character, world, None) | |
| raise self._dead_error(character) | |
| npc = self._build_npc(character, world) | |
| world.npcs.append(npc) | |
| character.spawned = True | |
| return npc | |
| def require_alive(self, character: PlayerCharacter, world: WorldState) -> Npc: | |
| npc = self.ensure_spawned(character, world) | |
| if not is_alive(npc): | |
| self._mark_dead(character, world, npc) | |
| raise self._dead_error(character) | |
| return npc | |
| def queue_action( | |
| self, | |
| character: PlayerCharacter, | |
| payload: dict[str, Any], | |
| world: WorldState, | |
| ) -> Npc: | |
| npc = self.require_alive(character, world) | |
| action = str(payload.get("action", "idle")).strip().lower() | |
| if action == VOTE_ACTION and not _vote_available(npc, world): | |
| raise PlayerError("vote_unavailable", "Vote is only available during your country's election.") | |
| if action not in PLAYER_ACTIONS and action != VOTE_ACTION: | |
| raise PlayerError( | |
| "unknown_action", | |
| f"Unknown action '{action}'. Valid actions: {', '.join(PLAYER_ACTIONS)}.", | |
| ) | |
| character.pending_action = {**payload, "action": action} | |
| return npc | |
| def take_directive(self, character: PlayerCharacter, world: WorldState) -> NpcDirective | None: | |
| """Consume the queued action into a directive for the upcoming tick.""" | |
| npc = self._find_npc(character, world) | |
| if npc is None or not is_alive(npc): | |
| return None | |
| payload = character.pending_action | |
| character.pending_action = None | |
| if payload is None: | |
| # No instruction this tick: hold position rather than letting the | |
| # built-in planner take over a player-owned character. | |
| return NpcDirective(npc_id=npc.id, action="idle", intent="player_idle") | |
| return self._directive_from_payload(npc, payload) | |
| # -- payloads ---------------------------------------------------------- # | |
| def character_status(self, character: PlayerCharacter, world: WorldState) -> dict[str, Any]: | |
| npc = self._find_npc(character, world) | |
| alive = npc is not None and is_alive(npc) | |
| if npc is not None and not alive: | |
| self._mark_dead(character, world, npc) | |
| return { | |
| "name": character.name, | |
| "icon": character.icon, | |
| "npc_id": character.npc_id, | |
| "country_id": character.country_id, | |
| "spawned": character.spawned, | |
| "alive": alive, | |
| "death_cause": character.death_cause, | |
| "tick": world.tick, | |
| "self": self._self_payload(npc) if npc is not None else None, | |
| } | |
| def observation(self, character: PlayerCharacter, world: WorldState) -> dict[str, Any]: | |
| npc = self.require_alive(character, world) | |
| return { | |
| "tick": world.tick, | |
| "game_status": world.game_status, | |
| "self": self._self_payload(npc), | |
| "country": self._country_payload(npc, world), | |
| "perception_text": build_perception(npc, world), | |
| "nearby_npcs": self._nearby_npcs(npc, world), | |
| "nearby_beasts": self._nearby_beasts(npc, world), | |
| "nearby_resources": self._nearby_resources(npc, world), | |
| "nearby_houses": self._nearby_houses(npc, world), | |
| "recent_events": [ | |
| {"tick": event.tick, "summary": event.summary, "severity": event.severity} | |
| for event in world.event_log[-8:] | |
| ], | |
| } | |
| # -- internals --------------------------------------------------------- # | |
| def _find_npc(self, character: PlayerCharacter, world: WorldState) -> Npc | None: | |
| return next((npc for npc in world.npcs if npc.id == character.npc_id), None) | |
| def _build_npc(self, character: PlayerCharacter, world: WorldState) -> Npc: | |
| position = self._spawn_position(world, character.country_id) | |
| npc = Npc( | |
| id=character.npc_id, | |
| name=character.display_name, | |
| role=PLAYER_ROLE, | |
| position=position, | |
| country_id=character.country_id, | |
| health=100, | |
| attack_damage=12, | |
| personality="player", | |
| is_player=True, | |
| player_owner=character.name, | |
| unrestricted_actions=True, | |
| age=30, | |
| max_age=600, | |
| memory=[ | |
| MemoryEntry( | |
| tick=world.tick, | |
| text=f"{character.name} joined the world.", | |
| ) | |
| ], | |
| ) | |
| if is_survival_world(world): | |
| npc.hunger = 20.0 | |
| npc.fear = 0.0 | |
| npc.safety = 50.0 | |
| # A balanced starter kit so the agent can immediately do anything: | |
| # eat, heal, fight, build or trade — players are not role-gated. | |
| npc.inventory_food = 2 | |
| npc.inventory_herbs = 1 | |
| npc.inventory_wood = 5 | |
| npc.inventory_weapon = 1 | |
| npc.survival_goal = select_goal(npc, world) | |
| # Seed neutral relationships both ways so memory/perception read sanely. | |
| for other in world.npcs: | |
| if other.id == npc.id: | |
| continue | |
| npc.relationships.setdefault(other.id, 0.45) | |
| other.relationships.setdefault(npc.id, 0.45) | |
| return npc | |
| def _spawn_position(self, world: WorldState, country_id: str) -> Vec3: | |
| half_width = world.terrain.width / 2 | |
| half_depth = world.terrain.depth / 2 | |
| country = next((item for item in world.countries if item.id == country_id), None) | |
| anchor = country.treasury.position if country and country.treasury else Vec3(0.0, 0.0, 0.0) | |
| # Spawn near the map centre, fanned out so multiple players do not | |
| # stack on the exact same tile. | |
| spawned_players = sum(1 for npc in world.npcs if npc.is_player) | |
| ring = [ | |
| (0.0, -BLOCK_SIZE), | |
| (BLOCK_SIZE, -BLOCK_SIZE), | |
| (-BLOCK_SIZE, -BLOCK_SIZE), | |
| (BLOCK_SIZE * 2, 0.0), | |
| (-BLOCK_SIZE * 2, 0.0), | |
| (0.0, -BLOCK_SIZE * 2), | |
| ] | |
| offset_x, offset_z = ring[spawned_players % len(ring)] | |
| x = _clamp(anchor.x + offset_x, -half_width + BLOCK_SIZE, half_width - BLOCK_SIZE) | |
| z = _clamp(anchor.z + offset_z, -half_depth + BLOCK_SIZE, half_depth - BLOCK_SIZE) | |
| return Vec3(x=round(x, 3), y=0.0, z=round(z, 3)) | |
| def _directive_from_payload(self, npc: Npc, payload: dict[str, Any]) -> NpcDirective: | |
| action: ActionKind = payload["action"] | |
| target: Vec3 | None = None | |
| x = payload.get("x") | |
| z = payload.get("z") | |
| if isinstance(x, int | float) and isinstance(z, int | float): | |
| target = Vec3(x=float(x), y=0.0, z=float(z)) | |
| return NpcDirective( | |
| npc_id=npc.id, | |
| action=action, | |
| target=target, | |
| target_npc_id=_opt_str(payload.get("target_npc_id") or payload.get("candidate_id")), | |
| target_entity_id=_opt_str(payload.get("target_entity_id")), | |
| resource_id=_opt_str(payload.get("resource_id")), | |
| resource_type=_opt_str(payload.get("resource_type")), | |
| use_type=_opt_str(payload.get("use_type")), | |
| params=payload.get("params") if isinstance(payload.get("params"), dict) else None, | |
| amount=_opt_int(payload.get("amount")), | |
| communication_intent=_opt_str(payload.get("communication_intent")), | |
| message=_opt_str(payload.get("message")), | |
| memory=_opt_str(payload.get("memory")), | |
| intent="player", | |
| away=bool(payload.get("away", False)), | |
| take=bool(payload.get("take", False)), | |
| ) | |
| def _self_payload(self, npc: Npc) -> dict[str, Any]: | |
| return { | |
| "npc_id": npc.id, | |
| "name": npc.name, | |
| "role": npc.role, | |
| "country_id": npc.country_id, | |
| "special_status": npc.special_status, | |
| "position": {"x": npc.position.x, "z": npc.position.z}, | |
| "health": npc.health, | |
| "hunger": round(npc.hunger, 1), | |
| "fear": round(npc.fear, 1), | |
| "safety": round(npc.safety, 1), | |
| "goal": npc.survival_goal, | |
| "intention": npc.intention, | |
| "inventory": { | |
| "food": npc.inventory_food, | |
| "herbs": npc.inventory_herbs, | |
| "wood": npc.inventory_wood, | |
| "weapon": npc.inventory_weapon, | |
| "coins": npc.inventory_coins, | |
| }, | |
| } | |
| def _nearby_npcs(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: | |
| nearby: list[dict[str, Any]] = [] | |
| for other in world.npcs: | |
| if other.id == npc.id or not is_alive(other): | |
| continue | |
| distance = vec_distance(npc.position, other.position) | |
| if distance > 12 * BLOCK_SIZE: | |
| continue | |
| nearby.append( | |
| { | |
| "id": other.id, | |
| "name": other.name, | |
| "role": other.role, | |
| "country_id": other.country_id, | |
| "distance_tiles": round(distance / BLOCK_SIZE, 1), | |
| "trust": round(npc.relationships.get(other.id, 0.0), 2), | |
| "is_player": other.is_player, | |
| } | |
| ) | |
| nearby.sort(key=lambda item: item["distance_tiles"]) | |
| return nearby[:10] | |
| def _nearby_beasts(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: | |
| beasts: list[dict[str, Any]] = [] | |
| for beast in world.beasts: | |
| if beast.state == "dead": | |
| continue | |
| distance = vec_distance(npc.position, beast.position) | |
| beasts.append( | |
| { | |
| "id": beast.id, | |
| "distance_tiles": round(distance / BLOCK_SIZE, 1), | |
| "state": beast.state, | |
| "health": round(beast.health, 1), | |
| } | |
| ) | |
| beasts.sort(key=lambda item: item["distance_tiles"]) | |
| return beasts | |
| def _nearby_resources(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: | |
| nodes: list[dict[str, Any]] = [] | |
| for node in world.resource_nodes: | |
| if node.amount <= 0: | |
| continue | |
| distance = vec_distance(npc.position, node.position) | |
| if distance > 12 * BLOCK_SIZE: | |
| continue | |
| nodes.append( | |
| { | |
| "id": node.id, | |
| "type": node.resource_type, | |
| "amount": node.amount, | |
| "distance_tiles": round(distance / BLOCK_SIZE, 1), | |
| } | |
| ) | |
| nodes.sort(key=lambda item: item["distance_tiles"]) | |
| return nodes[:10] | |
| def _nearby_houses(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: | |
| houses: list[dict[str, Any]] = [] | |
| for house in world.houses: | |
| distance = vec_distance(npc.position, house.position) | |
| houses.append( | |
| { | |
| "id": house.id, | |
| "state": house.state, | |
| "hp": round(house.hp, 1), | |
| "distance_tiles": round(distance / BLOCK_SIZE, 1), | |
| } | |
| ) | |
| houses.sort(key=lambda item: item["distance_tiles"]) | |
| return houses | |
| def _country_payload(self, npc: Npc, world: WorldState) -> dict[str, Any] | None: | |
| if npc.country_id is None: | |
| return None | |
| country = next((item for item in world.countries if item.id == npc.country_id), None) | |
| if country is None: | |
| return None | |
| election = _active_election_payload(npc, world) | |
| return { | |
| "id": country.id, | |
| "name": country.name, | |
| "badge": country.badge, | |
| "ruler_id": country.ruler_id, | |
| "policy": country.policy, | |
| "treasury": dict(country.treasury.resources) if country.treasury else {}, | |
| "treasury_id": country.treasury.id if country.treasury else None, | |
| "treasury_position": None | |
| if country.treasury is None | |
| else {"x": country.treasury.position.x, "z": country.treasury.position.z}, | |
| "cannon_id": country.cannon.id if country.cannon else None, | |
| "next_election_tick": country.next_election_tick, | |
| "election": election, | |
| } | |
| def _mark_dead(self, character: PlayerCharacter, world: WorldState, npc: Npc | None) -> None: | |
| character.spawned = True | |
| if character.death_cause is not None: | |
| return | |
| character.death_cause = _death_cause_for(world, character.npc_id) or "unknown" | |
| def _dead_error(self, character: PlayerCharacter) -> PlayerError: | |
| cause = character.death_cause or "unknown" | |
| return PlayerError( | |
| "character_dead", | |
| f"Your character '{character.name}' has died (cause: {cause}). " | |
| "Create a new character in the web UI to continue playing.", | |
| ) | |
| def _death_cause_for(world: WorldState, npc_id: str) -> str | None: | |
| for event in reversed(world.event_log): | |
| if event.type == "npc_died" and event.target_id == npc_id: | |
| return event.object_id or "unknown" | |
| return None | |
| def _vote_available(npc: Npc, world: WorldState) -> bool: | |
| if npc.country_id is None: | |
| return False | |
| for election in world.elections: | |
| if election.country_id != npc.country_id or election.completed: | |
| continue | |
| if election.start_tick <= world.tick <= election.end_tick and npc.id not in election.votes: | |
| return True | |
| return False | |
| def _active_election_payload(npc: Npc, world: WorldState) -> dict[str, Any] | None: | |
| if npc.country_id is None: | |
| return None | |
| for election in world.elections: | |
| if election.country_id != npc.country_id or election.completed: | |
| continue | |
| if election.start_tick <= world.tick <= election.end_tick: | |
| return { | |
| "country_id": election.country_id, | |
| "start_tick": election.start_tick, | |
| "end_tick": election.end_tick, | |
| "candidate_ids": list(election.candidate_ids), | |
| "votes": dict(election.votes), | |
| "has_voted": npc.id in election.votes, | |
| } | |
| return None | |
| def _clamp(value: float, minimum: float, maximum: float) -> float: | |
| return min(max(value, minimum), maximum) | |
| def _opt_str(value: Any) -> str | None: | |
| if value is None: | |
| return None | |
| text = str(value).strip() | |
| return text or None | |
| def _opt_int(value: Any) -> int | None: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |