DeltaZN
feat: add persistance
8043870
Raw
History Blame Contribute Delete
20.4 kB
"""External-agent player characters and their lifecycle.
A player is a human who connects their own agent over MCP. They create a
character in the public UI, receive a token, and then drive that character
through MCP tool calls. The character only enters the world (``world.npcs``)
after the first MCP interaction, and once it dies the player must create a new
one.
This module is intentionally free of locking: every method that reads or
mutates world state is called by :class:`GameRuntime` while it already holds
its own lock, which serialises access against the tick loop.
"""
from __future__ import annotations
import secrets
from dataclasses import asdict, dataclass, fields
from typing import Any
from world_simulator.domain import MemoryEntry, Npc, Vec3, WorldState
from world_simulator.simulation.connectors.base import ActionKind, NpcDirective
from world_simulator.simulation.mechanics import BLOCK_SIZE, is_alive, vec_distance
from world_simulator.simulation.survival import (
build_perception,
is_survival_world,
select_goal,
)
# Internal engine role for a player NPC. Players are deliberately not bound to a
# faction/role yet (a richer faction primitive comes later) and are NOT
# role-gated, so this only affects cosmetics like the default rendering.
PLAYER_ROLE = "gatherer"
# Player NPCs are not role-gated (see survival.validate_survival_action): an
# external agent may request any of these actions. The survival resolver still
# repairs physically impossible requests (no target, empty inventory,
# out-of-range) but never blocks an action by role.
PLAYER_ACTIONS: tuple[str, ...] = ("move", "speak", "attack", "use", "transfer")
VOTE_ACTION = "vote"
class PlayerError(Exception):
"""Raised for player-facing failures (unknown token, dead character, ...)."""
def __init__(self, code: str, message: str) -> None:
super().__init__(message)
self.code = code
self.message = message
@dataclass(slots=True)
class PlayerCharacter:
token: str
name: str
icon: str
country_id: str
npc_id: str
created_tick: int
spawned: bool = False
death_cause: str | None = None
pending_action: dict[str, Any] | None = None
last_result: str | None = None
@property
def display_name(self) -> str:
icon = self.icon.strip()
return f"{icon} {self.name}".strip() if icon else self.name
class PlayerManager:
"""Registry of player characters keyed by their MCP token."""
def __init__(self) -> None:
self._by_token: dict[str, PlayerCharacter] = {}
self._counter = 0
# -- registry ---------------------------------------------------------- #
def create(
self,
*,
name: str,
icon: str,
country_id: str = "nemotron",
created_tick: int,
) -> PlayerCharacter:
clean_name = (name or "").strip()
if not clean_name:
raise PlayerError("invalid_name", "Character name must not be empty.")
if len(clean_name) > 24:
clean_name = clean_name[:24]
self._counter += 1
token = secrets.token_urlsafe(16)
character = PlayerCharacter(
token=token,
name=clean_name,
icon=(icon or "").strip()[:4],
country_id=country_id if country_id in {"nemotron", "qwen"} else "nemotron",
npc_id=f"player-{self._counter:03d}-{token[:6]}",
created_tick=created_tick,
)
self._by_token[token] = character
return character
def get(self, token: str) -> PlayerCharacter:
character = self._by_token.get((token or "").strip())
if character is None:
raise PlayerError("invalid_token", "Unknown character token.")
return character
def all_characters(self) -> list[PlayerCharacter]:
return list(self._by_token.values())
@property
def player_npc_ids(self) -> set[str]:
return {character.npc_id for character in self._by_token.values()}
# -- persistence (called under the runtime lock) ----------------------- #
def snapshot(self) -> dict[str, Any]:
return {
"counter": self._counter,
"characters": [asdict(character) for character in self._by_token.values()],
}
def restore(self, state: dict[str, Any]) -> None:
known = {field.name for field in fields(PlayerCharacter)}
self._counter = int(state.get("counter", 0))
self._by_token = {}
for raw in state.get("characters", []):
if not isinstance(raw, dict) or "token" not in raw:
continue
character = PlayerCharacter(**{key: raw[key] for key in known if key in raw})
self._by_token[character.token] = character
# -- world lifecycle (called under the runtime lock) ------------------- #
def ensure_spawned(self, character: PlayerCharacter, world: WorldState) -> Npc:
"""Spawn the character into the world on first interaction.
Raises :class:`PlayerError` if the character has already died.
"""
existing = self._find_npc(character, world)
if existing is not None:
if not is_alive(existing):
self._mark_dead(character, world, existing)
raise self._dead_error(character)
return existing
if character.spawned:
# It was spawned before but the NPC is gone -> treat as dead.
self._mark_dead(character, world, None)
raise self._dead_error(character)
npc = self._build_npc(character, world)
world.npcs.append(npc)
character.spawned = True
return npc
def require_alive(self, character: PlayerCharacter, world: WorldState) -> Npc:
npc = self.ensure_spawned(character, world)
if not is_alive(npc):
self._mark_dead(character, world, npc)
raise self._dead_error(character)
return npc
def queue_action(
self,
character: PlayerCharacter,
payload: dict[str, Any],
world: WorldState,
) -> Npc:
npc = self.require_alive(character, world)
action = str(payload.get("action", "idle")).strip().lower()
if action == VOTE_ACTION and not _vote_available(npc, world):
raise PlayerError("vote_unavailable", "Vote is only available during your country's election.")
if action not in PLAYER_ACTIONS and action != VOTE_ACTION:
raise PlayerError(
"unknown_action",
f"Unknown action '{action}'. Valid actions: {', '.join(PLAYER_ACTIONS)}.",
)
character.pending_action = {**payload, "action": action}
return npc
def take_directive(self, character: PlayerCharacter, world: WorldState) -> NpcDirective | None:
"""Consume the queued action into a directive for the upcoming tick."""
npc = self._find_npc(character, world)
if npc is None or not is_alive(npc):
return None
payload = character.pending_action
character.pending_action = None
if payload is None:
# No instruction this tick: hold position rather than letting the
# built-in planner take over a player-owned character.
return NpcDirective(npc_id=npc.id, action="idle", intent="player_idle")
return self._directive_from_payload(npc, payload)
# -- payloads ---------------------------------------------------------- #
def character_status(self, character: PlayerCharacter, world: WorldState) -> dict[str, Any]:
npc = self._find_npc(character, world)
alive = npc is not None and is_alive(npc)
if npc is not None and not alive:
self._mark_dead(character, world, npc)
return {
"name": character.name,
"icon": character.icon,
"npc_id": character.npc_id,
"country_id": character.country_id,
"spawned": character.spawned,
"alive": alive,
"death_cause": character.death_cause,
"tick": world.tick,
"self": self._self_payload(npc) if npc is not None else None,
}
def observation(self, character: PlayerCharacter, world: WorldState) -> dict[str, Any]:
npc = self.require_alive(character, world)
return {
"tick": world.tick,
"game_status": world.game_status,
"self": self._self_payload(npc),
"country": self._country_payload(npc, world),
"perception_text": build_perception(npc, world),
"nearby_npcs": self._nearby_npcs(npc, world),
"nearby_beasts": self._nearby_beasts(npc, world),
"nearby_resources": self._nearby_resources(npc, world),
"nearby_houses": self._nearby_houses(npc, world),
"recent_events": [
{"tick": event.tick, "summary": event.summary, "severity": event.severity}
for event in world.event_log[-8:]
],
}
# -- internals --------------------------------------------------------- #
def _find_npc(self, character: PlayerCharacter, world: WorldState) -> Npc | None:
return next((npc for npc in world.npcs if npc.id == character.npc_id), None)
def _build_npc(self, character: PlayerCharacter, world: WorldState) -> Npc:
position = self._spawn_position(world, character.country_id)
npc = Npc(
id=character.npc_id,
name=character.display_name,
role=PLAYER_ROLE,
position=position,
country_id=character.country_id,
health=100,
attack_damage=12,
personality="player",
is_player=True,
player_owner=character.name,
unrestricted_actions=True,
age=30,
max_age=600,
memory=[
MemoryEntry(
tick=world.tick,
text=f"{character.name} joined the world.",
)
],
)
if is_survival_world(world):
npc.hunger = 20.0
npc.fear = 0.0
npc.safety = 50.0
# A balanced starter kit so the agent can immediately do anything:
# eat, heal, fight, build or trade — players are not role-gated.
npc.inventory_food = 2
npc.inventory_herbs = 1
npc.inventory_wood = 5
npc.inventory_weapon = 1
npc.survival_goal = select_goal(npc, world)
# Seed neutral relationships both ways so memory/perception read sanely.
for other in world.npcs:
if other.id == npc.id:
continue
npc.relationships.setdefault(other.id, 0.45)
other.relationships.setdefault(npc.id, 0.45)
return npc
def _spawn_position(self, world: WorldState, country_id: str) -> Vec3:
half_width = world.terrain.width / 2
half_depth = world.terrain.depth / 2
country = next((item for item in world.countries if item.id == country_id), None)
anchor = country.treasury.position if country and country.treasury else Vec3(0.0, 0.0, 0.0)
# Spawn near the map centre, fanned out so multiple players do not
# stack on the exact same tile.
spawned_players = sum(1 for npc in world.npcs if npc.is_player)
ring = [
(0.0, -BLOCK_SIZE),
(BLOCK_SIZE, -BLOCK_SIZE),
(-BLOCK_SIZE, -BLOCK_SIZE),
(BLOCK_SIZE * 2, 0.0),
(-BLOCK_SIZE * 2, 0.0),
(0.0, -BLOCK_SIZE * 2),
]
offset_x, offset_z = ring[spawned_players % len(ring)]
x = _clamp(anchor.x + offset_x, -half_width + BLOCK_SIZE, half_width - BLOCK_SIZE)
z = _clamp(anchor.z + offset_z, -half_depth + BLOCK_SIZE, half_depth - BLOCK_SIZE)
return Vec3(x=round(x, 3), y=0.0, z=round(z, 3))
def _directive_from_payload(self, npc: Npc, payload: dict[str, Any]) -> NpcDirective:
action: ActionKind = payload["action"]
target: Vec3 | None = None
x = payload.get("x")
z = payload.get("z")
if isinstance(x, int | float) and isinstance(z, int | float):
target = Vec3(x=float(x), y=0.0, z=float(z))
return NpcDirective(
npc_id=npc.id,
action=action,
target=target,
target_npc_id=_opt_str(payload.get("target_npc_id") or payload.get("candidate_id")),
target_entity_id=_opt_str(payload.get("target_entity_id")),
resource_id=_opt_str(payload.get("resource_id")),
resource_type=_opt_str(payload.get("resource_type")),
use_type=_opt_str(payload.get("use_type")),
params=payload.get("params") if isinstance(payload.get("params"), dict) else None,
amount=_opt_int(payload.get("amount")),
communication_intent=_opt_str(payload.get("communication_intent")),
message=_opt_str(payload.get("message")),
memory=_opt_str(payload.get("memory")),
intent="player",
away=bool(payload.get("away", False)),
take=bool(payload.get("take", False)),
)
def _self_payload(self, npc: Npc) -> dict[str, Any]:
return {
"npc_id": npc.id,
"name": npc.name,
"role": npc.role,
"country_id": npc.country_id,
"special_status": npc.special_status,
"position": {"x": npc.position.x, "z": npc.position.z},
"health": npc.health,
"hunger": round(npc.hunger, 1),
"fear": round(npc.fear, 1),
"safety": round(npc.safety, 1),
"goal": npc.survival_goal,
"intention": npc.intention,
"inventory": {
"food": npc.inventory_food,
"herbs": npc.inventory_herbs,
"wood": npc.inventory_wood,
"weapon": npc.inventory_weapon,
"coins": npc.inventory_coins,
},
}
def _nearby_npcs(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]:
nearby: list[dict[str, Any]] = []
for other in world.npcs:
if other.id == npc.id or not is_alive(other):
continue
distance = vec_distance(npc.position, other.position)
if distance > 12 * BLOCK_SIZE:
continue
nearby.append(
{
"id": other.id,
"name": other.name,
"role": other.role,
"country_id": other.country_id,
"distance_tiles": round(distance / BLOCK_SIZE, 1),
"trust": round(npc.relationships.get(other.id, 0.0), 2),
"is_player": other.is_player,
}
)
nearby.sort(key=lambda item: item["distance_tiles"])
return nearby[:10]
def _nearby_beasts(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]:
beasts: list[dict[str, Any]] = []
for beast in world.beasts:
if beast.state == "dead":
continue
distance = vec_distance(npc.position, beast.position)
beasts.append(
{
"id": beast.id,
"distance_tiles": round(distance / BLOCK_SIZE, 1),
"state": beast.state,
"health": round(beast.health, 1),
}
)
beasts.sort(key=lambda item: item["distance_tiles"])
return beasts
def _nearby_resources(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]:
nodes: list[dict[str, Any]] = []
for node in world.resource_nodes:
if node.amount <= 0:
continue
distance = vec_distance(npc.position, node.position)
if distance > 12 * BLOCK_SIZE:
continue
nodes.append(
{
"id": node.id,
"type": node.resource_type,
"amount": node.amount,
"distance_tiles": round(distance / BLOCK_SIZE, 1),
}
)
nodes.sort(key=lambda item: item["distance_tiles"])
return nodes[:10]
def _nearby_houses(self, npc: Npc, world: WorldState) -> list[dict[str, Any]]:
houses: list[dict[str, Any]] = []
for house in world.houses:
distance = vec_distance(npc.position, house.position)
houses.append(
{
"id": house.id,
"state": house.state,
"hp": round(house.hp, 1),
"distance_tiles": round(distance / BLOCK_SIZE, 1),
}
)
houses.sort(key=lambda item: item["distance_tiles"])
return houses
def _country_payload(self, npc: Npc, world: WorldState) -> dict[str, Any] | None:
if npc.country_id is None:
return None
country = next((item for item in world.countries if item.id == npc.country_id), None)
if country is None:
return None
election = _active_election_payload(npc, world)
return {
"id": country.id,
"name": country.name,
"badge": country.badge,
"ruler_id": country.ruler_id,
"policy": country.policy,
"treasury": dict(country.treasury.resources) if country.treasury else {},
"treasury_id": country.treasury.id if country.treasury else None,
"treasury_position": None
if country.treasury is None
else {"x": country.treasury.position.x, "z": country.treasury.position.z},
"cannon_id": country.cannon.id if country.cannon else None,
"next_election_tick": country.next_election_tick,
"election": election,
}
def _mark_dead(self, character: PlayerCharacter, world: WorldState, npc: Npc | None) -> None:
character.spawned = True
if character.death_cause is not None:
return
character.death_cause = _death_cause_for(world, character.npc_id) or "unknown"
def _dead_error(self, character: PlayerCharacter) -> PlayerError:
cause = character.death_cause or "unknown"
return PlayerError(
"character_dead",
f"Your character '{character.name}' has died (cause: {cause}). "
"Create a new character in the web UI to continue playing.",
)
def _death_cause_for(world: WorldState, npc_id: str) -> str | None:
for event in reversed(world.event_log):
if event.type == "npc_died" and event.target_id == npc_id:
return event.object_id or "unknown"
return None
def _vote_available(npc: Npc, world: WorldState) -> bool:
if npc.country_id is None:
return False
for election in world.elections:
if election.country_id != npc.country_id or election.completed:
continue
if election.start_tick <= world.tick <= election.end_tick and npc.id not in election.votes:
return True
return False
def _active_election_payload(npc: Npc, world: WorldState) -> dict[str, Any] | None:
if npc.country_id is None:
return None
for election in world.elections:
if election.country_id != npc.country_id or election.completed:
continue
if election.start_tick <= world.tick <= election.end_tick:
return {
"country_id": election.country_id,
"start_tick": election.start_tick,
"end_tick": election.end_tick,
"candidate_ids": list(election.candidate_ids),
"votes": dict(election.votes),
"has_voted": npc.id in election.votes,
}
return None
def _clamp(value: float, minimum: float, maximum: float) -> float:
return min(max(value, minimum), maximum)
def _opt_str(value: Any) -> str | None:
if value is None:
return None
text = str(value).strip()
return text or None
def _opt_int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None