DeltaZN
feat: rename god -> world
c58d3eb
Raw
History Blame Contribute Delete
7.77 kB
from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any
from world_simulator.domain import Npc, WorldState
from world_simulator.simulation.mechanics import (
ATTACK_RADIUS,
TALK_RADIUS,
VISIBLE_RADIUS,
distance_between,
has_hostile_intent,
is_alive,
)
from world_simulator.simulation.memory import recent_meaningful_memories
@dataclass(frozen=True, slots=True)
class CitizenPerception:
npc_id: str
tick: int
nearby_citizens: list[dict[str, Any]]
nearby_threats: list[dict[str, Any]]
under_threat: bool
visible_targets: list[dict[str, Any]]
recent_memories: list[str]
directive_summary: str | None
last_action_summary: str | None
last_hostile_actions: list[str]
can_flee: bool
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def build_perception(world: WorldState, npc: Npc) -> CitizenPerception:
memory_threat_ids = _memory_threat_ids(world, npc)
nearby_citizens: list[dict[str, Any]] = []
nearby_threats: list[dict[str, Any]] = []
visible_targets: list[dict[str, Any]] = []
for other in world.living_npcs():
if other.id == npc.id:
continue
distance = distance_between(npc, other)
if distance > VISIBLE_RADIUS:
continue
active_threat = has_hostile_intent(other)
memory_threat = other.id in memory_threat_ids
threat = active_threat or memory_threat
can_talk = distance <= TALK_RADIUS
can_attack = distance <= ATTACK_RADIUS
can_attack_you = threat and distance <= ATTACK_RADIUS
citizen = {
"id": other.id,
"npc_id": other.id,
"name": other.name,
"position": {"x": other.position.x, "z": other.position.z},
"condition": _health_condition(other),
"is_alive": is_alive(other),
"distance": round(distance, 3),
"can_talk": can_talk,
"can_attack": can_attack,
"can_attack_you": can_attack_you,
"is_active_threat": threat,
"ordinary_talk_allowed": not threat,
}
nearby_citizens.append(citizen)
visible_targets.append(
{
"id": other.id,
"npc_id": other.id,
"name": other.name,
"distance": round(distance, 3),
"can_talk": can_talk,
"can_attack": can_attack,
}
)
if threat:
nearby_threats.append(
{
"id": other.id,
"npc_id": other.id,
"name": other.name,
"reason": _threat_reason(other, memory_threat=memory_threat),
"distance": round(distance, 3),
"can_attack_you": can_attack_you,
"can_talk": can_talk,
"can_attack": can_attack,
}
)
return CitizenPerception(
npc_id=npc.id,
tick=world.tick,
nearby_citizens=sorted(nearby_citizens, key=lambda item: (item["distance"], item["id"])),
nearby_threats=sorted(nearby_threats, key=lambda item: (item["distance"], item["id"])),
under_threat=bool(nearby_threats),
visible_targets=sorted(visible_targets, key=lambda item: (item["distance"], item["id"])),
recent_memories=[memory.text for memory in recent_meaningful_memories(npc)],
directive_summary=npc.god_directive,
last_action_summary=_last_action_summary(world, npc.id),
last_hostile_actions=_last_hostile_actions(npc),
can_flee=bool(nearby_threats),
)
def threat_ids_from_memory(world: WorldState, npc: Npc) -> set[str]:
return _memory_threat_ids(world, npc)
def recent_attacker_id(world: WorldState, npc: Npc) -> str | None:
for episode in reversed(npc.structured_memory.episodes):
if (
episode.kind == "attack"
and episode.subject_kind == "npc"
and episode.perspective == "recipient"
and episode.actor_id != npc.id
):
return episode.actor_id
texts = _memory_texts(npc)
for text in reversed(texts):
lowered = text.lower()
if "attacked you" not in lowered and "attacked " not in lowered:
continue
attacker = _mentioned_npc(world, text)
if attacker is not None and attacker.id != npc.id:
return attacker.id
return None
def nearby_threat_npc(world: WorldState, perception: CitizenPerception) -> Npc | None:
if not perception.nearby_threats:
return None
threat_id = perception.nearby_threats[0]["npc_id"]
return next((npc for npc in world.living_npcs() if npc.id == threat_id), None)
def _memory_threat_ids(world: WorldState, npc: Npc) -> set[str]:
threat_ids: set[str] = set()
for episode in npc.structured_memory.episodes:
if episode.actor_id == npc.id:
continue
if episode.subject_kind == "npc" and (
episode.kind in {"attack", "steal"} or "danger" in episode.tags
):
threat_ids.add(episode.actor_id)
for text in _memory_texts(npc):
lowered = text.lower()
if "dangerous" not in lowered and "attacked" not in lowered:
continue
threat = _mentioned_npc(world, text)
if threat is not None and threat.id != npc.id:
threat_ids.add(threat.id)
return threat_ids
def _mentioned_npc(world: WorldState, text: str) -> Npc | None:
lowered = text.lower()
for npc in world.living_npcs():
name = npc.name.lower()
npc_id = npc.id.lower()
if f"{name} attacked" in lowered or f"{npc_id} attacked" in lowered:
return npc
if f"{name} is dangerous" in lowered or f"{npc_id} is dangerous" in lowered:
return npc
matches = [
(lowered.find(npc.name.lower()), npc)
for npc in world.living_npcs()
if npc.name.lower() in lowered or npc.id.lower() in lowered
]
if not matches:
return None
return min(matches, key=lambda item: item[0])[1]
def _memory_texts(npc: Npc) -> list[str]:
texts = [memory.text for memory in recent_meaningful_memories(npc)]
if npc.memory_summary:
texts.append(npc.memory_summary)
return texts
def _last_hostile_actions(npc: Npc) -> list[str]:
return [
text
for text in _memory_texts(npc)
if "attacked" in text.lower() or "dangerous" in text.lower()
][-3:]
def _last_action_summary(world: WorldState, npc_id: str) -> str | None:
for trace in reversed(world.last_action_debug):
if trace.get("npc_id") != npc_id:
continue
resolution = trace.get("resolution")
if isinstance(resolution, dict):
summary = resolution.get("summary")
if isinstance(summary, str):
return summary
validation = trace.get("validation")
if isinstance(validation, dict):
reason = validation.get("reason")
if isinstance(reason, str):
return reason
return None
def _threat_reason(npc: Npc, *, memory_threat: bool) -> str:
if has_hostile_intent(npc) and npc.god_directive:
return "active hostile directive"
if has_hostile_intent(npc):
return "active hostile behavior"
if memory_threat:
return "remembered dangerous behavior"
return "possible threat"
def _health_condition(npc: Npc) -> str:
if not is_alive(npc):
return "dead"
if npc.health <= 25:
return "critical"
if npc.health <= 60:
return "injured"
return "healthy"