from __future__ import annotations from dataclasses import asdict, dataclass from typing import Any from world_simulator.domain import Npc, WorldState from world_simulator.simulation.mechanics import ( ATTACK_RADIUS, TALK_RADIUS, VISIBLE_RADIUS, distance_between, has_hostile_intent, is_alive, ) from world_simulator.simulation.memory import recent_meaningful_memories @dataclass(frozen=True, slots=True) class CitizenPerception: npc_id: str tick: int nearby_citizens: list[dict[str, Any]] nearby_threats: list[dict[str, Any]] under_threat: bool visible_targets: list[dict[str, Any]] recent_memories: list[str] directive_summary: str | None last_action_summary: str | None last_hostile_actions: list[str] can_flee: bool def to_dict(self) -> dict[str, Any]: return asdict(self) def build_perception(world: WorldState, npc: Npc) -> CitizenPerception: memory_threat_ids = _memory_threat_ids(world, npc) nearby_citizens: list[dict[str, Any]] = [] nearby_threats: list[dict[str, Any]] = [] visible_targets: list[dict[str, Any]] = [] for other in world.living_npcs(): if other.id == npc.id: continue distance = distance_between(npc, other) if distance > VISIBLE_RADIUS: continue active_threat = has_hostile_intent(other) memory_threat = other.id in memory_threat_ids threat = active_threat or memory_threat can_talk = distance <= TALK_RADIUS can_attack = distance <= ATTACK_RADIUS can_attack_you = threat and distance <= ATTACK_RADIUS citizen = { "id": other.id, "npc_id": other.id, "name": other.name, "position": {"x": other.position.x, "z": other.position.z}, "condition": _health_condition(other), "is_alive": is_alive(other), "distance": round(distance, 3), "can_talk": can_talk, "can_attack": can_attack, "can_attack_you": can_attack_you, "is_active_threat": threat, "ordinary_talk_allowed": not threat, } nearby_citizens.append(citizen) visible_targets.append( { "id": other.id, "npc_id": other.id, "name": other.name, "distance": round(distance, 3), "can_talk": can_talk, "can_attack": can_attack, } ) if threat: nearby_threats.append( { "id": other.id, "npc_id": other.id, "name": other.name, "reason": _threat_reason(other, memory_threat=memory_threat), "distance": round(distance, 3), "can_attack_you": can_attack_you, "can_talk": can_talk, "can_attack": can_attack, } ) return CitizenPerception( npc_id=npc.id, tick=world.tick, nearby_citizens=sorted(nearby_citizens, key=lambda item: (item["distance"], item["id"])), nearby_threats=sorted(nearby_threats, key=lambda item: (item["distance"], item["id"])), under_threat=bool(nearby_threats), visible_targets=sorted(visible_targets, key=lambda item: (item["distance"], item["id"])), recent_memories=[memory.text for memory in recent_meaningful_memories(npc)], directive_summary=npc.god_directive, last_action_summary=_last_action_summary(world, npc.id), last_hostile_actions=_last_hostile_actions(npc), can_flee=bool(nearby_threats), ) def threat_ids_from_memory(world: WorldState, npc: Npc) -> set[str]: return _memory_threat_ids(world, npc) def recent_attacker_id(world: WorldState, npc: Npc) -> str | None: for episode in reversed(npc.structured_memory.episodes): if ( episode.kind == "attack" and episode.subject_kind == "npc" and episode.perspective == "recipient" and episode.actor_id != npc.id ): return episode.actor_id texts = _memory_texts(npc) for text in reversed(texts): lowered = text.lower() if "attacked you" not in lowered and "attacked " not in lowered: continue attacker = _mentioned_npc(world, text) if attacker is not None and attacker.id != npc.id: return attacker.id return None def nearby_threat_npc(world: WorldState, perception: CitizenPerception) -> Npc | None: if not perception.nearby_threats: return None threat_id = perception.nearby_threats[0]["npc_id"] return next((npc for npc in world.living_npcs() if npc.id == threat_id), None) def _memory_threat_ids(world: WorldState, npc: Npc) -> set[str]: threat_ids: set[str] = set() for episode in npc.structured_memory.episodes: if episode.actor_id == npc.id: continue if episode.subject_kind == "npc" and ( episode.kind in {"attack", "steal"} or "danger" in episode.tags ): threat_ids.add(episode.actor_id) for text in _memory_texts(npc): lowered = text.lower() if "dangerous" not in lowered and "attacked" not in lowered: continue threat = _mentioned_npc(world, text) if threat is not None and threat.id != npc.id: threat_ids.add(threat.id) return threat_ids def _mentioned_npc(world: WorldState, text: str) -> Npc | None: lowered = text.lower() for npc in world.living_npcs(): name = npc.name.lower() npc_id = npc.id.lower() if f"{name} attacked" in lowered or f"{npc_id} attacked" in lowered: return npc if f"{name} is dangerous" in lowered or f"{npc_id} is dangerous" in lowered: return npc matches = [ (lowered.find(npc.name.lower()), npc) for npc in world.living_npcs() if npc.name.lower() in lowered or npc.id.lower() in lowered ] if not matches: return None return min(matches, key=lambda item: item[0])[1] def _memory_texts(npc: Npc) -> list[str]: texts = [memory.text for memory in recent_meaningful_memories(npc)] if npc.memory_summary: texts.append(npc.memory_summary) return texts def _last_hostile_actions(npc: Npc) -> list[str]: return [ text for text in _memory_texts(npc) if "attacked" in text.lower() or "dangerous" in text.lower() ][-3:] def _last_action_summary(world: WorldState, npc_id: str) -> str | None: for trace in reversed(world.last_action_debug): if trace.get("npc_id") != npc_id: continue resolution = trace.get("resolution") if isinstance(resolution, dict): summary = resolution.get("summary") if isinstance(summary, str): return summary validation = trace.get("validation") if isinstance(validation, dict): reason = validation.get("reason") if isinstance(reason, str): return reason return None def _threat_reason(npc: Npc, *, memory_threat: bool) -> str: if has_hostile_intent(npc) and npc.god_directive: return "active hostile directive" if has_hostile_intent(npc): return "active hostile behavior" if memory_threat: return "remembered dangerous behavior" return "possible threat" def _health_condition(npc: Npc) -> str: if not is_alive(npc): return "dead" if npc.health <= 25: return "critical" if npc.health <= 60: return "injured" return "healthy"