| from __future__ import annotations |
|
|
| from dataclasses import asdict, dataclass |
| from typing import Any |
|
|
| from world_simulator.domain import Npc, WorldState |
| from world_simulator.simulation.mechanics import ( |
| ATTACK_RADIUS, |
| TALK_RADIUS, |
| VISIBLE_RADIUS, |
| distance_between, |
| has_hostile_intent, |
| is_alive, |
| ) |
| from world_simulator.simulation.memory import recent_meaningful_memories |
|
|
|
|
| @dataclass(frozen=True, slots=True) |
| class CitizenPerception: |
| npc_id: str |
| tick: int |
| nearby_citizens: list[dict[str, Any]] |
| nearby_threats: list[dict[str, Any]] |
| under_threat: bool |
| visible_targets: list[dict[str, Any]] |
| recent_memories: list[str] |
| directive_summary: str | None |
| last_action_summary: str | None |
| last_hostile_actions: list[str] |
| can_flee: bool |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return asdict(self) |
|
|
|
|
| def build_perception(world: WorldState, npc: Npc) -> CitizenPerception: |
| memory_threat_ids = _memory_threat_ids(world, npc) |
| nearby_citizens: list[dict[str, Any]] = [] |
| nearby_threats: list[dict[str, Any]] = [] |
| visible_targets: list[dict[str, Any]] = [] |
|
|
| for other in world.living_npcs(): |
| if other.id == npc.id: |
| continue |
|
|
| distance = distance_between(npc, other) |
| if distance > VISIBLE_RADIUS: |
| continue |
|
|
| active_threat = has_hostile_intent(other) |
| memory_threat = other.id in memory_threat_ids |
| threat = active_threat or memory_threat |
| can_talk = distance <= TALK_RADIUS |
| can_attack = distance <= ATTACK_RADIUS |
| can_attack_you = threat and distance <= ATTACK_RADIUS |
| citizen = { |
| "id": other.id, |
| "npc_id": other.id, |
| "name": other.name, |
| "position": {"x": other.position.x, "z": other.position.z}, |
| "condition": _health_condition(other), |
| "is_alive": is_alive(other), |
| "distance": round(distance, 3), |
| "can_talk": can_talk, |
| "can_attack": can_attack, |
| "can_attack_you": can_attack_you, |
| "is_active_threat": threat, |
| "ordinary_talk_allowed": not threat, |
| } |
| nearby_citizens.append(citizen) |
|
|
| visible_targets.append( |
| { |
| "id": other.id, |
| "npc_id": other.id, |
| "name": other.name, |
| "distance": round(distance, 3), |
| "can_talk": can_talk, |
| "can_attack": can_attack, |
| } |
| ) |
|
|
| if threat: |
| nearby_threats.append( |
| { |
| "id": other.id, |
| "npc_id": other.id, |
| "name": other.name, |
| "reason": _threat_reason(other, memory_threat=memory_threat), |
| "distance": round(distance, 3), |
| "can_attack_you": can_attack_you, |
| "can_talk": can_talk, |
| "can_attack": can_attack, |
| } |
| ) |
|
|
| return CitizenPerception( |
| npc_id=npc.id, |
| tick=world.tick, |
| nearby_citizens=sorted(nearby_citizens, key=lambda item: (item["distance"], item["id"])), |
| nearby_threats=sorted(nearby_threats, key=lambda item: (item["distance"], item["id"])), |
| under_threat=bool(nearby_threats), |
| visible_targets=sorted(visible_targets, key=lambda item: (item["distance"], item["id"])), |
| recent_memories=[memory.text for memory in recent_meaningful_memories(npc)], |
| directive_summary=npc.god_directive, |
| last_action_summary=_last_action_summary(world, npc.id), |
| last_hostile_actions=_last_hostile_actions(npc), |
| can_flee=bool(nearby_threats), |
| ) |
|
|
|
|
| def threat_ids_from_memory(world: WorldState, npc: Npc) -> set[str]: |
| return _memory_threat_ids(world, npc) |
|
|
|
|
| def recent_attacker_id(world: WorldState, npc: Npc) -> str | None: |
| for episode in reversed(npc.structured_memory.episodes): |
| if ( |
| episode.kind == "attack" |
| and episode.subject_kind == "npc" |
| and episode.perspective == "recipient" |
| and episode.actor_id != npc.id |
| ): |
| return episode.actor_id |
|
|
| texts = _memory_texts(npc) |
| for text in reversed(texts): |
| lowered = text.lower() |
| if "attacked you" not in lowered and "attacked " not in lowered: |
| continue |
| attacker = _mentioned_npc(world, text) |
| if attacker is not None and attacker.id != npc.id: |
| return attacker.id |
| return None |
|
|
|
|
| def nearby_threat_npc(world: WorldState, perception: CitizenPerception) -> Npc | None: |
| if not perception.nearby_threats: |
| return None |
| threat_id = perception.nearby_threats[0]["npc_id"] |
| return next((npc for npc in world.living_npcs() if npc.id == threat_id), None) |
|
|
|
|
| def _memory_threat_ids(world: WorldState, npc: Npc) -> set[str]: |
| threat_ids: set[str] = set() |
| for episode in npc.structured_memory.episodes: |
| if episode.actor_id == npc.id: |
| continue |
| if episode.subject_kind == "npc" and ( |
| episode.kind in {"attack", "steal"} or "danger" in episode.tags |
| ): |
| threat_ids.add(episode.actor_id) |
|
|
| for text in _memory_texts(npc): |
| lowered = text.lower() |
| if "dangerous" not in lowered and "attacked" not in lowered: |
| continue |
| threat = _mentioned_npc(world, text) |
| if threat is not None and threat.id != npc.id: |
| threat_ids.add(threat.id) |
| return threat_ids |
|
|
|
|
| def _mentioned_npc(world: WorldState, text: str) -> Npc | None: |
| lowered = text.lower() |
| for npc in world.living_npcs(): |
| name = npc.name.lower() |
| npc_id = npc.id.lower() |
| if f"{name} attacked" in lowered or f"{npc_id} attacked" in lowered: |
| return npc |
| if f"{name} is dangerous" in lowered or f"{npc_id} is dangerous" in lowered: |
| return npc |
|
|
| matches = [ |
| (lowered.find(npc.name.lower()), npc) |
| for npc in world.living_npcs() |
| if npc.name.lower() in lowered or npc.id.lower() in lowered |
| ] |
| if not matches: |
| return None |
| return min(matches, key=lambda item: item[0])[1] |
|
|
|
|
| def _memory_texts(npc: Npc) -> list[str]: |
| texts = [memory.text for memory in recent_meaningful_memories(npc)] |
| if npc.memory_summary: |
| texts.append(npc.memory_summary) |
| return texts |
|
|
|
|
| def _last_hostile_actions(npc: Npc) -> list[str]: |
| return [ |
| text |
| for text in _memory_texts(npc) |
| if "attacked" in text.lower() or "dangerous" in text.lower() |
| ][-3:] |
|
|
|
|
| def _last_action_summary(world: WorldState, npc_id: str) -> str | None: |
| for trace in reversed(world.last_action_debug): |
| if trace.get("npc_id") != npc_id: |
| continue |
| resolution = trace.get("resolution") |
| if isinstance(resolution, dict): |
| summary = resolution.get("summary") |
| if isinstance(summary, str): |
| return summary |
| validation = trace.get("validation") |
| if isinstance(validation, dict): |
| reason = validation.get("reason") |
| if isinstance(reason, str): |
| return reason |
| return None |
|
|
|
|
| def _threat_reason(npc: Npc, *, memory_threat: bool) -> str: |
| if has_hostile_intent(npc) and npc.god_directive: |
| return "active hostile directive" |
| if has_hostile_intent(npc): |
| return "active hostile behavior" |
| if memory_threat: |
| return "remembered dangerous behavior" |
| return "possible threat" |
|
|
|
|
| def _health_condition(npc: Npc) -> str: |
| if not is_alive(npc): |
| return "dead" |
| if npc.health <= 25: |
| return "critical" |
| if npc.health <= 60: |
| return "injured" |
| return "healthy" |
|
|