"""External-agent player characters and their lifecycle. A player is a human who connects their own agent over MCP. They create a character in the public UI, receive a token, and then drive that character through MCP tool calls. The character only enters the world (``world.npcs``) after the first MCP interaction, and once it dies the player must create a new one. This module is intentionally free of locking: every method that reads or mutates world state is called by :class:`GameRuntime` while it already holds its own lock, which serialises access against the tick loop. """ from __future__ import annotations import secrets from dataclasses import asdict, dataclass, fields from typing import Any from world_simulator.domain import MemoryEntry, Npc, Vec3, WorldState from world_simulator.simulation.connectors.base import ActionKind, NpcDirective from world_simulator.simulation.mechanics import BLOCK_SIZE, is_alive, vec_distance from world_simulator.simulation.survival import ( build_perception, is_survival_world, select_goal, ) # Internal engine role for a player NPC. Players are deliberately not bound to a # faction/role yet (a richer faction primitive comes later) and are NOT # role-gated, so this only affects cosmetics like the default rendering. PLAYER_ROLE = "gatherer" # Player NPCs are not role-gated (see survival.validate_survival_action): an # external agent may request any of these actions. The survival resolver still # repairs physically impossible requests (no target, empty inventory, # out-of-range) but never blocks an action by role. PLAYER_ACTIONS: tuple[str, ...] = ("move", "speak", "attack", "use", "transfer") VOTE_ACTION = "vote" class PlayerError(Exception): """Raised for player-facing failures (unknown token, dead character, ...).""" def __init__(self, code: str, message: str) -> None: super().__init__(message) self.code = code self.message = message @dataclass(slots=True) class PlayerCharacter: token: str name: str icon: str country_id: str npc_id: str created_tick: int spawned: bool = False death_cause: str | None = None pending_action: dict[str, Any] | None = None last_result: str | None = None @property def display_name(self) -> str: icon = self.icon.strip() return f"{icon} {self.name}".strip() if icon else self.name class PlayerManager: """Registry of player characters keyed by their MCP token.""" def __init__(self) -> None: self._by_token: dict[str, PlayerCharacter] = {} self._counter = 0 # -- registry ---------------------------------------------------------- # def create( self, *, name: str, icon: str, country_id: str = "nemotron", created_tick: int, ) -> PlayerCharacter: clean_name = (name or "").strip() if not clean_name: raise PlayerError("invalid_name", "Character name must not be empty.") if len(clean_name) > 24: clean_name = clean_name[:24] self._counter += 1 token = secrets.token_urlsafe(16) character = PlayerCharacter( token=token, name=clean_name, icon=(icon or "").strip()[:4], country_id=country_id if country_id in {"nemotron", "qwen"} else "nemotron", npc_id=f"player-{self._counter:03d}-{token[:6]}", created_tick=created_tick, ) self._by_token[token] = character return character def get(self, token: str) -> PlayerCharacter: character = self._by_token.get((token or "").strip()) if character is None: raise PlayerError("invalid_token", "Unknown character token.") return character def all_characters(self) -> list[PlayerCharacter]: return list(self._by_token.values()) @property def player_npc_ids(self) -> set[str]: return {character.npc_id for character in self._by_token.values()} # -- persistence (called under the runtime lock) ----------------------- # def snapshot(self) -> dict[str, Any]: return { "counter": self._counter, "characters": [asdict(character) for character in self._by_token.values()], } def restore(self, state: dict[str, Any]) -> None: known = {field.name for field in fields(PlayerCharacter)} self._counter = int(state.get("counter", 0)) self._by_token = {} for raw in state.get("characters", []): if not isinstance(raw, dict) or "token" not in raw: continue character = PlayerCharacter(**{key: raw[key] for key in known if key in raw}) self._by_token[character.token] = character # -- world lifecycle (called under the runtime lock) ------------------- # def ensure_spawned(self, character: PlayerCharacter, world: WorldState) -> Npc: """Spawn the character into the world on first interaction. Raises :class:`PlayerError` if the character has already died. """ existing = self._find_npc(character, world) if existing is not None: if not is_alive(existing): self._mark_dead(character, world, existing) raise self._dead_error(character) return existing if character.spawned: # It was spawned before but the NPC is gone -> treat as dead. self._mark_dead(character, world, None) raise self._dead_error(character) npc = self._build_npc(character, world) world.npcs.append(npc) character.spawned = True return npc def require_alive(self, character: PlayerCharacter, world: WorldState) -> Npc: npc = self.ensure_spawned(character, world) if not is_alive(npc): self._mark_dead(character, world, npc) raise self._dead_error(character) return npc def queue_action( self, character: PlayerCharacter, payload: dict[str, Any], world: WorldState, ) -> Npc: npc = self.require_alive(character, world) action = str(payload.get("action", "idle")).strip().lower() if action == VOTE_ACTION and not _vote_available(npc, world): raise PlayerError("vote_unavailable", "Vote is only available during your country's election.") if action not in PLAYER_ACTIONS and action != VOTE_ACTION: raise PlayerError( "unknown_action", f"Unknown action '{action}'. Valid actions: {', '.join(PLAYER_ACTIONS)}.", ) character.pending_action = {**payload, "action": action} return npc def take_directive(self, character: PlayerCharacter, world: WorldState) -> NpcDirective | None: """Consume the queued action into a directive for the upcoming tick.""" npc = self._find_npc(character, world) if npc is None or not is_alive(npc): return None payload = character.pending_action character.pending_action = None if payload is None: # No instruction this tick: hold position rather than letting the # built-in planner take over a player-owned character. return NpcDirective(npc_id=npc.id, action="idle", intent="player_idle") return self._directive_from_payload(npc, payload) # -- payloads ---------------------------------------------------------- # def character_status(self, character: PlayerCharacter, world: WorldState) -> dict[str, Any]: npc = self._find_npc(character, world) alive = npc is not None and is_alive(npc) if npc is not None and not alive: self._mark_dead(character, world, npc) return { "name": character.name, "icon": character.icon, "npc_id": character.npc_id, "country_id": character.country_id, "spawned": character.spawned, "alive": alive, "death_cause": character.death_cause, "tick": world.tick, "self": self._self_payload(npc) if npc is not None else None, } def observation(self, character: PlayerCharacter, world: WorldState) -> dict[str, Any]: npc = self.require_alive(character, world) return { "tick": world.tick, "game_status": world.game_status, "self": self._self_payload(npc), "country": self._country_payload(npc, world), "perception_text": build_perception(npc, world), "nearby_npcs": self._nearby_npcs(npc, world), "nearby_beasts": self._nearby_beasts(npc, world), "nearby_resources": self._nearby_resources(npc, world), "nearby_houses": self._nearby_houses(npc, world), "recent_events": [ {"tick": event.tick, "summary": event.summary, "severity": event.severity} for event in world.event_log[-8:] ], } # -- internals --------------------------------------------------------- # def _find_npc(self, character: PlayerCharacter, world: WorldState) -> Npc | None: return next((npc for npc in world.npcs if npc.id == character.npc_id), None) def _build_npc(self, character: PlayerCharacter, world: WorldState) -> Npc: position = self._spawn_position(world, character.country_id) npc = Npc( id=character.npc_id, name=character.display_name, role=PLAYER_ROLE, position=position, country_id=character.country_id, health=100, attack_damage=12, personality="player", is_player=True, player_owner=character.name, unrestricted_actions=True, age=30, max_age=600, memory=[ MemoryEntry( tick=world.tick, text=f"{character.name} joined the world.", ) ], ) if is_survival_world(world): npc.hunger = 20.0 npc.fear = 0.0 npc.safety = 50.0 # A balanced starter kit so the agent can immediately do anything: # eat, heal, fight, build or trade — players are not role-gated. npc.inventory_food = 2 npc.inventory_herbs = 1 npc.inventory_wood = 5 npc.inventory_weapon = 1 npc.survival_goal = select_goal(npc, world) # Seed neutral relationships both ways so memory/perception read sanely. for other in world.npcs: if other.id == npc.id: continue npc.relationships.setdefault(other.id, 0.45) other.relationships.setdefault(npc.id, 0.45) return npc def _spawn_position(self, world: WorldState, country_id: str) -> Vec3: half_width = world.terrain.width / 2 half_depth = world.terrain.depth / 2 country = next((item for item in world.countries if item.id == country_id), None) anchor = country.treasury.position if country and country.treasury else Vec3(0.0, 0.0, 0.0) # Spawn near the map centre, fanned out so multiple players do not # stack on the exact same tile. spawned_players = sum(1 for npc in world.npcs if npc.is_player) ring = [ (0.0, -BLOCK_SIZE), (BLOCK_SIZE, -BLOCK_SIZE), (-BLOCK_SIZE, -BLOCK_SIZE), (BLOCK_SIZE * 2, 0.0), (-BLOCK_SIZE * 2, 0.0), (0.0, -BLOCK_SIZE * 2), ] offset_x, offset_z = ring[spawned_players % len(ring)] x = _clamp(anchor.x + offset_x, -half_width + BLOCK_SIZE, half_width - BLOCK_SIZE) z = _clamp(anchor.z + offset_z, -half_depth + BLOCK_SIZE, half_depth - BLOCK_SIZE) return Vec3(x=round(x, 3), y=0.0, z=round(z, 3)) def _directive_from_payload(self, npc: Npc, payload: dict[str, Any]) -> NpcDirective: action: ActionKind = payload["action"] target: Vec3 | None = None x = payload.get("x") z = payload.get("z") if isinstance(x, int | float) and isinstance(z, int | float): target = Vec3(x=float(x), y=0.0, z=float(z)) return NpcDirective( npc_id=npc.id, action=action, target=target, target_npc_id=_opt_str(payload.get("target_npc_id") or payload.get("candidate_id")), target_entity_id=_opt_str(payload.get("target_entity_id")), resource_id=_opt_str(payload.get("resource_id")), resource_type=_opt_str(payload.get("resource_type")), use_type=_opt_str(payload.get("use_type")), params=payload.get("params") if isinstance(payload.get("params"), dict) else None, amount=_opt_int(payload.get("amount")), communication_intent=_opt_str(payload.get("communication_intent")), message=_opt_str(payload.get("message")), memory=_opt_str(payload.get("memory")), intent="player", away=bool(payload.get("away", False)), take=bool(payload.get("take", False)), ) def _self_payload(self, npc: Npc) -> dict[str, Any]: return { "npc_id": npc.id, "name": npc.name, "role": npc.role, "country_id": npc.country_id, "special_status": npc.special_status, "position": {"x": npc.position.x, "z": npc.position.z}, "health": npc.health, "hunger": round(npc.hunger, 1), "fear": round(npc.fear, 1), "safety": round(npc.safety, 1), "goal": npc.survival_goal, "intention": npc.intention, "inventory": { "food": npc.inventory_food, "herbs": npc.inventory_herbs, "wood": npc.inventory_wood, "weapon": npc.inventory_weapon, "coins": npc.inventory_coins, }, } def _nearby_npcs(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: nearby: list[dict[str, Any]] = [] for other in world.npcs: if other.id == npc.id or not is_alive(other): continue distance = vec_distance(npc.position, other.position) if distance > 12 * BLOCK_SIZE: continue nearby.append( { "id": other.id, "name": other.name, "role": other.role, "country_id": other.country_id, "distance_tiles": round(distance / BLOCK_SIZE, 1), "trust": round(npc.relationships.get(other.id, 0.0), 2), "is_player": other.is_player, } ) nearby.sort(key=lambda item: item["distance_tiles"]) return nearby[:10] def _nearby_beasts(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: beasts: list[dict[str, Any]] = [] for beast in world.beasts: if beast.state == "dead": continue distance = vec_distance(npc.position, beast.position) beasts.append( { "id": beast.id, "distance_tiles": round(distance / BLOCK_SIZE, 1), "state": beast.state, "health": round(beast.health, 1), } ) beasts.sort(key=lambda item: item["distance_tiles"]) return beasts def _nearby_resources(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: nodes: list[dict[str, Any]] = [] for node in world.resource_nodes: if node.amount <= 0: continue distance = vec_distance(npc.position, node.position) if distance > 12 * BLOCK_SIZE: continue nodes.append( { "id": node.id, "type": node.resource_type, "amount": node.amount, "distance_tiles": round(distance / BLOCK_SIZE, 1), } ) nodes.sort(key=lambda item: item["distance_tiles"]) return nodes[:10] def _nearby_houses(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]: houses: list[dict[str, Any]] = [] for house in world.houses: distance = vec_distance(npc.position, house.position) houses.append( { "id": house.id, "state": house.state, "hp": round(house.hp, 1), "distance_tiles": round(distance / BLOCK_SIZE, 1), } ) houses.sort(key=lambda item: item["distance_tiles"]) return houses def _country_payload(self, npc: Npc, world: WorldState) -> dict[str, Any] | None: if npc.country_id is None: return None country = next((item for item in world.countries if item.id == npc.country_id), None) if country is None: return None election = _active_election_payload(npc, world) return { "id": country.id, "name": country.name, "badge": country.badge, "ruler_id": country.ruler_id, "policy": country.policy, "treasury": dict(country.treasury.resources) if country.treasury else {}, "treasury_id": country.treasury.id if country.treasury else None, "treasury_position": None if country.treasury is None else {"x": country.treasury.position.x, "z": country.treasury.position.z}, "cannon_id": country.cannon.id if country.cannon else None, "next_election_tick": country.next_election_tick, "election": election, } def _mark_dead(self, character: PlayerCharacter, world: WorldState, npc: Npc | None) -> None: character.spawned = True if character.death_cause is not None: return character.death_cause = _death_cause_for(world, character.npc_id) or "unknown" def _dead_error(self, character: PlayerCharacter) -> PlayerError: cause = character.death_cause or "unknown" return PlayerError( "character_dead", f"Your character '{character.name}' has died (cause: {cause}). " "Create a new character in the web UI to continue playing.", ) def _death_cause_for(world: WorldState, npc_id: str) -> str | None: for event in reversed(world.event_log): if event.type == "npc_died" and event.target_id == npc_id: return event.object_id or "unknown" return None def _vote_available(npc: Npc, world: WorldState) -> bool: if npc.country_id is None: return False for election in world.elections: if election.country_id != npc.country_id or election.completed: continue if election.start_tick <= world.tick <= election.end_tick and npc.id not in election.votes: return True return False def _active_election_payload(npc: Npc, world: WorldState) -> dict[str, Any] | None: if npc.country_id is None: return None for election in world.elections: if election.country_id != npc.country_id or election.completed: continue if election.start_tick <= world.tick <= election.end_tick: return { "country_id": election.country_id, "start_tick": election.start_tick, "end_tick": election.end_tick, "candidate_ids": list(election.candidate_ids), "votes": dict(election.votes), "has_voted": npc.id in election.votes, } return None def _clamp(value: float, minimum: float, maximum: float) -> float: return min(max(value, minimum), maximum) def _opt_str(value: Any) -> str | None: if value is None: return None text = str(value).strip() return text or None def _opt_int(value: Any) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError): return None