Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| from dataclasses import asdict, dataclass | |
| from typing import Any | |
| from world_simulator.domain import Npc, WorldState | |
| from world_simulator.simulation.mechanics import ( | |
| ATTACK_RADIUS, | |
| TALK_RADIUS, | |
| VISIBLE_RADIUS, | |
| distance_between, | |
| has_hostile_intent, | |
| is_alive, | |
| ) | |
| from world_simulator.simulation.memory import recent_meaningful_memories | |
| class CitizenPerception: | |
| npc_id: str | |
| tick: int | |
| nearby_citizens: list[dict[str, Any]] | |
| nearby_threats: list[dict[str, Any]] | |
| under_threat: bool | |
| visible_targets: list[dict[str, Any]] | |
| recent_memories: list[str] | |
| directive_summary: str | None | |
| last_action_summary: str | None | |
| last_hostile_actions: list[str] | |
| can_flee: bool | |
| def to_dict(self) -> dict[str, Any]: | |
| return asdict(self) | |
| def build_perception(world: WorldState, npc: Npc) -> CitizenPerception: | |
| memory_threat_ids = _memory_threat_ids(world, npc) | |
| nearby_citizens: list[dict[str, Any]] = [] | |
| nearby_threats: list[dict[str, Any]] = [] | |
| visible_targets: list[dict[str, Any]] = [] | |
| for other in world.living_npcs(): | |
| if other.id == npc.id: | |
| continue | |
| distance = distance_between(npc, other) | |
| if distance > VISIBLE_RADIUS: | |
| continue | |
| active_threat = has_hostile_intent(other) | |
| memory_threat = other.id in memory_threat_ids | |
| threat = active_threat or memory_threat | |
| can_talk = distance <= TALK_RADIUS | |
| can_attack = distance <= ATTACK_RADIUS | |
| can_attack_you = threat and distance <= ATTACK_RADIUS | |
| citizen = { | |
| "id": other.id, | |
| "npc_id": other.id, | |
| "name": other.name, | |
| "position": {"x": other.position.x, "z": other.position.z}, | |
| "condition": _health_condition(other), | |
| "is_alive": is_alive(other), | |
| "distance": round(distance, 3), | |
| "can_talk": can_talk, | |
| "can_attack": can_attack, | |
| "can_attack_you": can_attack_you, | |
| "is_active_threat": threat, | |
| "ordinary_talk_allowed": not threat, | |
| } | |
| nearby_citizens.append(citizen) | |
| visible_targets.append( | |
| { | |
| "id": other.id, | |
| "npc_id": other.id, | |
| "name": other.name, | |
| "distance": round(distance, 3), | |
| "can_talk": can_talk, | |
| "can_attack": can_attack, | |
| } | |
| ) | |
| if threat: | |
| nearby_threats.append( | |
| { | |
| "id": other.id, | |
| "npc_id": other.id, | |
| "name": other.name, | |
| "reason": _threat_reason(other, memory_threat=memory_threat), | |
| "distance": round(distance, 3), | |
| "can_attack_you": can_attack_you, | |
| "can_talk": can_talk, | |
| "can_attack": can_attack, | |
| } | |
| ) | |
| return CitizenPerception( | |
| npc_id=npc.id, | |
| tick=world.tick, | |
| nearby_citizens=sorted(nearby_citizens, key=lambda item: (item["distance"], item["id"])), | |
| nearby_threats=sorted(nearby_threats, key=lambda item: (item["distance"], item["id"])), | |
| under_threat=bool(nearby_threats), | |
| visible_targets=sorted(visible_targets, key=lambda item: (item["distance"], item["id"])), | |
| recent_memories=[memory.text for memory in recent_meaningful_memories(npc)], | |
| directive_summary=npc.god_directive, | |
| last_action_summary=_last_action_summary(world, npc.id), | |
| last_hostile_actions=_last_hostile_actions(npc), | |
| can_flee=bool(nearby_threats), | |
| ) | |
| def threat_ids_from_memory(world: WorldState, npc: Npc) -> set[str]: | |
| return _memory_threat_ids(world, npc) | |
| def recent_attacker_id(world: WorldState, npc: Npc) -> str | None: | |
| for episode in reversed(npc.structured_memory.episodes): | |
| if ( | |
| episode.kind == "attack" | |
| and episode.subject_kind == "npc" | |
| and episode.perspective == "recipient" | |
| and episode.actor_id != npc.id | |
| ): | |
| return episode.actor_id | |
| texts = _memory_texts(npc) | |
| for text in reversed(texts): | |
| lowered = text.lower() | |
| if "attacked you" not in lowered and "attacked " not in lowered: | |
| continue | |
| attacker = _mentioned_npc(world, text) | |
| if attacker is not None and attacker.id != npc.id: | |
| return attacker.id | |
| return None | |
| def nearby_threat_npc(world: WorldState, perception: CitizenPerception) -> Npc | None: | |
| if not perception.nearby_threats: | |
| return None | |
| threat_id = perception.nearby_threats[0]["npc_id"] | |
| return next((npc for npc in world.living_npcs() if npc.id == threat_id), None) | |
| def _memory_threat_ids(world: WorldState, npc: Npc) -> set[str]: | |
| threat_ids: set[str] = set() | |
| for episode in npc.structured_memory.episodes: | |
| if episode.actor_id == npc.id: | |
| continue | |
| if episode.subject_kind == "npc" and ( | |
| episode.kind in {"attack", "steal"} or "danger" in episode.tags | |
| ): | |
| threat_ids.add(episode.actor_id) | |
| for text in _memory_texts(npc): | |
| lowered = text.lower() | |
| if "dangerous" not in lowered and "attacked" not in lowered: | |
| continue | |
| threat = _mentioned_npc(world, text) | |
| if threat is not None and threat.id != npc.id: | |
| threat_ids.add(threat.id) | |
| return threat_ids | |
| def _mentioned_npc(world: WorldState, text: str) -> Npc | None: | |
| lowered = text.lower() | |
| for npc in world.living_npcs(): | |
| name = npc.name.lower() | |
| npc_id = npc.id.lower() | |
| if f"{name} attacked" in lowered or f"{npc_id} attacked" in lowered: | |
| return npc | |
| if f"{name} is dangerous" in lowered or f"{npc_id} is dangerous" in lowered: | |
| return npc | |
| matches = [ | |
| (lowered.find(npc.name.lower()), npc) | |
| for npc in world.living_npcs() | |
| if npc.name.lower() in lowered or npc.id.lower() in lowered | |
| ] | |
| if not matches: | |
| return None | |
| return min(matches, key=lambda item: item[0])[1] | |
| def _memory_texts(npc: Npc) -> list[str]: | |
| texts = [memory.text for memory in recent_meaningful_memories(npc)] | |
| if npc.memory_summary: | |
| texts.append(npc.memory_summary) | |
| return texts | |
| def _last_hostile_actions(npc: Npc) -> list[str]: | |
| return [ | |
| text | |
| for text in _memory_texts(npc) | |
| if "attacked" in text.lower() or "dangerous" in text.lower() | |
| ][-3:] | |
| def _last_action_summary(world: WorldState, npc_id: str) -> str | None: | |
| for trace in reversed(world.last_action_debug): | |
| if trace.get("npc_id") != npc_id: | |
| continue | |
| resolution = trace.get("resolution") | |
| if isinstance(resolution, dict): | |
| summary = resolution.get("summary") | |
| if isinstance(summary, str): | |
| return summary | |
| validation = trace.get("validation") | |
| if isinstance(validation, dict): | |
| reason = validation.get("reason") | |
| if isinstance(reason, str): | |
| return reason | |
| return None | |
| def _threat_reason(npc: Npc, *, memory_threat: bool) -> str: | |
| if has_hostile_intent(npc) and npc.god_directive: | |
| return "active hostile directive" | |
| if has_hostile_intent(npc): | |
| return "active hostile behavior" | |
| if memory_threat: | |
| return "remembered dangerous behavior" | |
| return "possible threat" | |
| def _health_condition(npc: Npc) -> str: | |
| if not is_alive(npc): | |
| return "dead" | |
| if npc.health <= 25: | |
| return "critical" | |
| if npc.health <= 60: | |
| return "injured" | |
| return "healthy" | |