Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import math | |
| import random | |
| from copy import deepcopy | |
| from dataclasses import asdict, replace | |
| from world_simulator.domain import CitizenGoal, Npc, Vec3, WorldState | |
| from world_simulator.observability import append_record | |
| from world_simulator.simulation.actions import PRIMITIVE_ACTION_IDS | |
| from world_simulator.simulation.autopilot import ( | |
| autopilot_directive, | |
| flee_directive, | |
| idle_directive, | |
| move_toward_target_directive, | |
| target_from_directive_or_goal, | |
| ) | |
| from world_simulator.simulation.connectors.base import ( | |
| ActionDebugTrace, | |
| NpcDirective, | |
| ResolutionResult, | |
| TickPlan, | |
| ValidationResult, | |
| WorldSimulator, | |
| ) | |
| from world_simulator.simulation.connectors.deterministic import DeterministicWorldSimulator | |
| from world_simulator.simulation.directives import expire_directives | |
| from world_simulator.simulation.goals import refresh_citizen_motivation | |
| from world_simulator.simulation.mechanics import ( | |
| ATTACK_RADIUS, | |
| BLOCK_SIZE, | |
| MAX_WALK_DISTANCE, | |
| TALK_RADIUS, | |
| VISIBLE_RADIUS, | |
| distance_between, | |
| has_hostile_intent, | |
| is_alive, | |
| ) | |
| from world_simulator.simulation.memory import remember | |
| from world_simulator.simulation.overseer import OverseerController, apply_overseer_metadata | |
| from world_simulator.simulation.perception import CitizenPerception | |
| from world_simulator.simulation.survival import ( | |
| add_episode, | |
| apply_post_survival_tick, | |
| apply_survival_plan, | |
| apply_survival_tick, | |
| is_survival_world, | |
| ) | |
| def advance_world( | |
| world: WorldState, | |
| simulator: WorldSimulator | None = None, | |
| overseer: OverseerController | None = None, | |
| ) -> WorldState: | |
| """Advance world state by asking the configured simulator for a tick plan.""" | |
| planned_tick = world.tick + 1 | |
| expire_directives(world, tick=planned_tick) | |
| next_tick, plan = plan_world_tick(world, simulator, next_tick=planned_tick, overseer=overseer) | |
| apply_tick_plan(world, next_tick, plan) | |
| return world | |
| def plan_world_tick( | |
| world: WorldState, | |
| simulator: WorldSimulator | None = None, | |
| next_tick: int | None = None, | |
| overseer: OverseerController | None = None, | |
| ) -> tuple[int, TickPlan]: | |
| """Build a tick plan without mutating the supplied world state.""" | |
| planned_tick = next_tick if next_tick is not None else world.tick + 1 | |
| active_simulator = simulator or DeterministicWorldSimulator() | |
| if overseer is None: | |
| return planned_tick, active_simulator.propose_tick(world, planned_tick) | |
| prompt_world = deepcopy(world) | |
| overseer_plan = overseer.augment_plan( | |
| prompt_world, | |
| planned_tick, | |
| TickPlan(source="pre_npc", directives=[]), | |
| ) | |
| apply_overseer_metadata(prompt_world, overseer_plan.overseer) | |
| npc_plan = active_simulator.propose_tick(prompt_world, planned_tick) | |
| merged = {directive.npc_id: directive for directive in npc_plan.directives} | |
| for directive in overseer_plan.directives: | |
| merged[directive.npc_id] = directive | |
| return ( | |
| planned_tick, | |
| TickPlan( | |
| source=f"{npc_plan.source}+overseer" if overseer_plan.overseer else npc_plan.source, | |
| directives=list(merged.values()), | |
| overseer=overseer_plan.overseer, | |
| ledger_entries=[*overseer_plan.ledger_entries, *npc_plan.ledger_entries], | |
| ), | |
| ) | |
| def apply_tick_plan(world: WorldState, next_tick: int, plan: TickPlan) -> None: | |
| """Commit an already computed tick plan into the live world state.""" | |
| # Deterministic survival environment (hunger, beast, fear) runs first and is | |
| # a no-op for worlds without survival entities. | |
| apply_survival_tick(world) | |
| # In a survival world all directives (deterministic or LLM) resolve through | |
| # the survival resolver so the engine owns hunger/health/position/inventory. | |
| if plan.source.startswith("survival") or is_survival_world(world): | |
| world.tick = next_tick | |
| apply_overseer_metadata(world, plan.overseer) | |
| debug = apply_survival_plan(world, next_tick, plan) | |
| _publish_plan_ledger_entries(plan.ledger_entries, debug) | |
| apply_post_survival_tick(world, next_tick) | |
| world.last_tick_source = plan.source | |
| world.last_action_debug = debug | |
| return | |
| plan, traces = validate_tick_plan(world, plan) | |
| traces_by_npc_id = {trace.npc_id: trace for trace in traces} | |
| resolved_traces: list[ActionDebugTrace] = [] | |
| half_width = world.terrain.width / 2 | |
| half_depth = world.terrain.depth / 2 | |
| npcs_by_id = {npc.id: npc for npc in world.npcs} | |
| move_targets = _resolve_move_targets( | |
| world, | |
| plan.directives, | |
| half_width=half_width, | |
| half_depth=half_depth, | |
| ) | |
| for directive in plan.directives: | |
| npc = npcs_by_id.get(directive.npc_id) | |
| if not npc: | |
| continue | |
| if not is_alive(npc): | |
| npc.intention = "dead" | |
| resolution = ResolutionResult( | |
| npc_id=directive.npc_id, | |
| action=directive.action, | |
| success=False, | |
| summary="actor dead", | |
| ) | |
| _append_resolution_trace(traces_by_npc_id, resolved_traces, directive, resolution) | |
| continue | |
| npc.focus_target_id = None | |
| match directive.action: | |
| case "move": | |
| resolution = _apply_move( | |
| npc, | |
| directive, | |
| move_targets.get(npc.id) if directive.target else None, | |
| npcs_by_id, | |
| next_tick, | |
| ) | |
| case "speak": | |
| resolution = _apply_speak(npc, directive, npcs_by_id, next_tick) | |
| case "attack" | "strike": | |
| resolution = _apply_strike(npc, directive, npcs_by_id, world, next_tick) | |
| case "idle": | |
| resolution = _apply_idle(npc, directive) | |
| case _: | |
| resolution = ResolutionResult( | |
| npc_id=directive.npc_id, | |
| action=directive.action, | |
| success=False, | |
| summary=f"unresolved action {directive.action}", | |
| ) | |
| if directive.memory: | |
| remember(npc, next_tick, directive.memory) | |
| if directive.conversation_user and directive.conversation_assistant: | |
| npc.conversation_context.add_turn( | |
| directive.conversation_user, | |
| directive.conversation_assistant, | |
| ) | |
| _append_resolution_trace(traces_by_npc_id, resolved_traces, directive, resolution) | |
| for npc in world.npcs: | |
| if not is_alive(npc): | |
| npc.intention = "dead" | |
| world.tick = next_tick | |
| apply_overseer_metadata(world, plan.overseer) | |
| world.last_tick_source = plan.source | |
| world.last_action_debug = [_trace_to_dict(trace) for trace in resolved_traces] | |
| _publish_plan_ledger_entries(plan.ledger_entries, world.last_action_debug) | |
| def validate_tick_plan( | |
| world: WorldState, | |
| plan: TickPlan, | |
| ) -> tuple[TickPlan, list[ActionDebugTrace]]: | |
| """Resolve requested NPC directives into safe engine directives.""" | |
| npcs_by_id = {npc.id: npc for npc in world.npcs} | |
| resolved_directives: list[NpcDirective] = [] | |
| traces: list[ActionDebugTrace] = [] | |
| for directive in plan.directives: | |
| npc = npcs_by_id.get(directive.npc_id) | |
| validation = validate_directive(world, npc, directive) | |
| traces.append( | |
| ActionDebugTrace( | |
| npc_id=directive.npc_id, | |
| original_directive=directive, | |
| validation=validation, | |
| ) | |
| ) | |
| resolved_directives.append(validation.directive) | |
| return ( | |
| TickPlan( | |
| source=plan.source, | |
| directives=resolved_directives, | |
| overseer=plan.overseer, | |
| ledger_entries=plan.ledger_entries, | |
| ), | |
| traces, | |
| ) | |
| def validate_directive( | |
| world: WorldState, | |
| npc: Npc | None, | |
| directive: NpcDirective, | |
| ) -> ValidationResult: | |
| """Validate or repair one directive before engine execution. | |
| Only physical impossibility is repaired here (missing or dead targets, | |
| out-of-range interactions, unknown actions). Behavioral plausibility is | |
| the planner's job: any physically possible primitive is executed as asked. | |
| """ | |
| if npc is None or npc.id != directive.npc_id: | |
| resolved = idle_directive( | |
| directive, | |
| memory="Action ignored because the actor was unavailable.", | |
| intent="invalid_actor", | |
| ) | |
| return _validation_result(directive, resolved, valid=False, reason="actor unavailable") | |
| if not is_alive(npc): | |
| resolved = idle_directive(directive, memory=None, intent="actor_dead") | |
| return _validation_result(directive, resolved, valid=False, reason="actor dead") | |
| original_directive = directive | |
| if directive.action == "idle": | |
| return _validation_result(directive, directive, valid=True, reason="idle accepted") | |
| if directive.action == "strike": | |
| directive = replace(directive, action="attack") | |
| perception, goal = refresh_citizen_motivation(world, npc) | |
| if directive.action not in PRIMITIVE_ACTION_IDS: | |
| fallback = autopilot_directive(world, npc, directive, perception, goal) | |
| return _validation_result( | |
| original_directive, | |
| fallback, | |
| valid=False, | |
| reason=f"unknown action {directive.action!r} replaced by autopilot", | |
| ) | |
| match directive.action: | |
| case "move": | |
| return _validate_move(world, npc, directive, perception, goal) | |
| case "speak": | |
| return _validate_speak(world, npc, directive) | |
| case "attack" | "strike": | |
| validation = _validate_strike(world, npc, directive) | |
| if original_directive.action != directive.action: | |
| return replace(validation, original_action=str(original_directive.action)) | |
| return validation | |
| case _: | |
| # use/transfer have no effects outside survival worlds. | |
| resolved = idle_directive( | |
| directive, | |
| memory="There was nothing here to use or trade.", | |
| intent="observe", | |
| ) | |
| return _validation_result( | |
| directive, | |
| resolved, | |
| valid=False, | |
| reason=f"{directive.action} has no effect outside survival worlds", | |
| ) | |
| def _validate_move( | |
| world: WorldState, | |
| npc: Npc, | |
| directive: NpcDirective, | |
| perception: CitizenPerception, | |
| goal: CitizenGoal | None, | |
| ) -> ValidationResult: | |
| if directive.away: | |
| threat = target_from_directive_or_goal(world, npc, directive, goal, perception) | |
| if threat is None: | |
| resolved = idle_directive( | |
| directive, | |
| memory="You braced because no visible threat was left to flee from.", | |
| intent="defend", | |
| ) | |
| return _validation_result( | |
| directive, | |
| resolved, | |
| valid=False, | |
| reason="move away had no resolvable threat", | |
| ) | |
| return _validation_result( | |
| directive, | |
| flee_directive(world, npc, threat, directive), | |
| valid=True, | |
| reason="move away resolved against nearest threat", | |
| ) | |
| if directive.target_npc_id: | |
| target = _npc_by_id(world, directive.target_npc_id) | |
| if target is None or target.id == npc.id or not is_alive(target): | |
| resolved = idle_directive( | |
| directive, | |
| memory="You could not move toward the target because it was unavailable.", | |
| intent="invalid_move_target", | |
| ) | |
| return _validation_result( | |
| directive, | |
| resolved, | |
| valid=False, | |
| reason="move target unavailable", | |
| ) | |
| return _validation_result( | |
| directive, | |
| move_toward_target_directive( | |
| npc, | |
| directive, | |
| target, | |
| intent=_move_intent(goal, target), | |
| ), | |
| valid=True, | |
| reason="move toward NPC resolved to their position", | |
| ) | |
| if directive.target is None: | |
| resolved = idle_directive(directive, memory=None, intent="missing_move_target") | |
| return _validation_result(directive, resolved, valid=False, reason="move target missing") | |
| resolved = replace(directive, target=_bounded_target(world, directive.target)) | |
| return _validation_result(directive, resolved, valid=True, reason="move accepted") | |
| def _validate_speak(world: WorldState, npc: Npc, directive: NpcDirective) -> ValidationResult: | |
| if not directive.target_npc_id: | |
| # An untargeted speak is a shout heard by everyone within earshot. | |
| return _validation_result( | |
| directive, | |
| directive, | |
| valid=True, | |
| reason="broadcast speech accepted", | |
| ) | |
| target = _npc_by_id(world, directive.target_npc_id) | |
| if target is None or target.id == npc.id or not is_alive(target): | |
| resolved = idle_directive( | |
| directive, | |
| memory="You could not speak because the listener was unavailable.", | |
| intent="invalid_speak_target", | |
| ) | |
| return _validation_result( | |
| directive, | |
| resolved, | |
| valid=False, | |
| reason="speak target unavailable", | |
| ) | |
| if distance_between(npc, target) > TALK_RADIUS: | |
| resolved = NpcDirective( | |
| npc_id=npc.id, | |
| action="move", | |
| target=target.position, | |
| target_npc_id=target.id, | |
| message=directive.message, | |
| memory=f"You moved closer to speak with {target.name}.", | |
| intent="approach_target_for_talk", | |
| confidence=directive.confidence, | |
| ) | |
| return _validation_result( | |
| directive, | |
| resolved, | |
| valid=True, | |
| reason="out-of-range speech converted to approach movement", | |
| ) | |
| return _validation_result(directive, directive, valid=True, reason="speak accepted") | |
| def _validate_strike(world: WorldState, npc: Npc, directive: NpcDirective) -> ValidationResult: | |
| target = _npc_by_id(world, directive.target_npc_id or "") | |
| if target is None or target.id == npc.id or not is_alive(target): | |
| resolved = idle_directive( | |
| directive, | |
| memory="You could not strike because the target was unavailable.", | |
| intent="invalid_strike_target", | |
| ) | |
| return _validation_result( | |
| directive, | |
| resolved, | |
| valid=False, | |
| reason="strike target unavailable", | |
| ) | |
| if distance_between(npc, target) > ATTACK_RADIUS: | |
| resolved = NpcDirective( | |
| npc_id=npc.id, | |
| action="move", | |
| target=target.position, | |
| target_npc_id=target.id, | |
| message=directive.message, | |
| memory=f"You moved toward {target.name} to get within striking range.", | |
| intent="approach_target_for_attack", | |
| confidence=directive.confidence, | |
| ) | |
| return _validation_result( | |
| directive, | |
| resolved, | |
| valid=True, | |
| reason="out-of-range strike converted to approach movement", | |
| ) | |
| return _validation_result(directive, directive, valid=True, reason="strike accepted") | |
| def _move_intent(goal: CitizenGoal | None, target: Npc) -> str: | |
| if ( | |
| goal is not None | |
| and goal.goal_type in ("pursue_and_attack_target", "retaliate") | |
| and goal.target_npc_id == target.id | |
| ): | |
| return "approach_target_for_attack" | |
| return "move_to_target" | |
| def _npc_by_id(world: WorldState, npc_id: str) -> Npc | None: | |
| return next((candidate for candidate in world.living_npcs() if candidate.id == npc_id), None) | |
| def _resolve_move_targets( | |
| world: WorldState, | |
| directives: list[NpcDirective], | |
| *, | |
| half_width: float, | |
| half_depth: float, | |
| ) -> dict[str, Vec3]: | |
| npcs_by_id = {npc.id: npc for npc in world.npcs} | |
| targets: dict[str, Vec3] = {} | |
| for directive in directives: | |
| if directive.action != "move" or not directive.target: | |
| continue | |
| npc = npcs_by_id.get(directive.npc_id) | |
| if not npc or not is_alive(npc): | |
| continue | |
| targets[npc.id] = _planned_move_target( | |
| npc, | |
| directive.target, | |
| half_width=half_width, | |
| half_depth=half_depth, | |
| ) | |
| return targets | |
| def _planned_move_target( | |
| npc: Npc, | |
| raw_target: Vec3, | |
| *, | |
| half_width: float, | |
| half_depth: float, | |
| ) -> Vec3: | |
| target_x = _snap_to_block_center(raw_target.x, half_width=half_width) | |
| target_z = _snap_to_block_center(raw_target.z, half_width=half_depth) | |
| delta_x = target_x - npc.position.x | |
| delta_z = target_z - npc.position.z | |
| distance = math.hypot(delta_x, delta_z) | |
| if distance > MAX_WALK_DISTANCE: | |
| scale = MAX_WALK_DISTANCE / distance | |
| target_x = npc.position.x + delta_x * scale | |
| target_z = npc.position.z + delta_z * scale | |
| return Vec3( | |
| x=round(_clamp(target_x, -half_width, half_width), 3), | |
| y=0.0, | |
| z=round(_clamp(target_z, -half_depth, half_depth), 3), | |
| ) | |
| def _apply_move( | |
| npc: Npc, | |
| directive: NpcDirective, | |
| target: Vec3 | None, | |
| npcs_by_id: dict[str, Npc], | |
| next_tick: int, | |
| ) -> ResolutionResult: | |
| if target is None: | |
| npc.intention = "idle" | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=False, | |
| summary="idle", | |
| ) | |
| npc.position = target | |
| target_npc = npcs_by_id.get(directive.target_npc_id or "") | |
| match directive.intent: | |
| case "approach_target_for_attack": | |
| npc.mode = "hostile_pursuit" | |
| if target_npc: | |
| npc.intention = f"approaching {target_npc.name} to attack" | |
| else: | |
| npc.intention = "approaching to attack" | |
| if target_npc: | |
| _remember_threat_observers( | |
| actor=npc, | |
| target=target_npc, | |
| npcs_by_id=npcs_by_id, | |
| next_tick=next_tick, | |
| text=f"{npc.name} is moving toward {target_npc.name} with hostile intent.", | |
| include_target=True, | |
| ) | |
| case "approach_target_for_talk": | |
| npc.intention = ( | |
| f"approaching {target_npc.name} to talk" if target_npc else "approaching to talk" | |
| ) | |
| case "flee_from_threat": | |
| npc.mode = "fleeing" | |
| npc.intention = f"fleeing from {target_npc.name}" if target_npc else "fleeing" | |
| case _: | |
| npc.intention = "walking" | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=True, | |
| summary=npc.intention, | |
| ) | |
| def _apply_speak( | |
| npc: Npc, | |
| directive: NpcDirective, | |
| npcs_by_id: dict[str, Npc], | |
| next_tick: int, | |
| ) -> ResolutionResult: | |
| if not directive.target_npc_id: | |
| return _apply_broadcast_speech(npc, directive, npcs_by_id, next_tick) | |
| target = npcs_by_id.get(directive.target_npc_id) | |
| if not target or target.id == npc.id or not is_alive(target): | |
| npc.intention = "could not speak" | |
| remember(npc, next_tick, "You could not speak because the listener was unavailable.") | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=False, | |
| summary=npc.intention, | |
| ) | |
| if distance_between(npc, target) > TALK_RADIUS: | |
| npc.intention = f"too far to speak with {target.name}" | |
| remember(npc, next_tick, f"You were too far away to speak with {target.name}.") | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=False, | |
| summary=npc.intention, | |
| ) | |
| message = directive.message or "checking in" | |
| npc.intention = f"talking to {target.name}" | |
| npc.focus_target_id = target.id | |
| target.focus_target_id = npc.id | |
| add_episode( | |
| npc, | |
| next_tick, | |
| "communicate", | |
| npc.id, | |
| f"I told {target.name}: {message}", | |
| target_id=target.id, | |
| subject_kind="npc", | |
| perspective="self", | |
| tags=["social"], | |
| weight=0.3, | |
| ) | |
| add_episode( | |
| target, | |
| next_tick, | |
| "communicate", | |
| npc.id, | |
| f"{npc.name} told me: {message}", | |
| target_id=target.id, | |
| subject_kind="npc", | |
| perspective="recipient", | |
| tags=["social"], | |
| weight=0.3, | |
| ) | |
| remember(npc, next_tick, f"You told {target.name}: {message}") | |
| remember(target, next_tick, f"{npc.name} told you: {message}") | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=True, | |
| summary=npc.intention, | |
| ) | |
| def _apply_broadcast_speech( | |
| npc: Npc, | |
| directive: NpcDirective, | |
| npcs_by_id: dict[str, Npc], | |
| next_tick: int, | |
| ) -> ResolutionResult: | |
| is_help_request = directive.communication_intent == "help_request" | |
| message = directive.message or ("Help! I am in danger!" if is_help_request else "Hey!") | |
| npc.intention = "calling for help" if is_help_request else "shouting" | |
| tags = ["help", "danger"] if is_help_request else ["social"] | |
| add_episode( | |
| npc, | |
| next_tick, | |
| "communicate", | |
| npc.id, | |
| f"I shouted: {message}", | |
| subject_kind="npc", | |
| perspective="self", | |
| tags=tags, | |
| weight=0.6 if is_help_request else 0.3, | |
| ) | |
| _remember_nearby_citizens( | |
| actor=npc, | |
| npcs_by_id=npcs_by_id, | |
| next_tick=next_tick, | |
| text=f"{npc.name} shouted: {message}", | |
| kind="communicate", | |
| tags=tags, | |
| ) | |
| remember(npc, next_tick, f"You shouted: {message}") | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=True, | |
| summary=npc.intention, | |
| ) | |
| def _apply_strike( | |
| npc: Npc, | |
| directive: NpcDirective, | |
| npcs_by_id: dict[str, Npc], | |
| world: WorldState, | |
| next_tick: int, | |
| ) -> ResolutionResult: | |
| target = npcs_by_id.get(directive.target_npc_id or "") | |
| if not target or target.id == npc.id or not is_alive(target): | |
| npc.intention = "could not attack" | |
| remember(npc, next_tick, "You could not attack because the target was unavailable.") | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=False, | |
| summary=npc.intention, | |
| ) | |
| if distance_between(npc, target) > ATTACK_RADIUS: | |
| npc.intention = f"too far to attack {target.name}" | |
| remember(npc, next_tick, f"You were too far away to attack {target.name}.") | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=False, | |
| summary=npc.intention, | |
| ) | |
| rng = random.Random(f"{world.seed}:{next_tick}:{npc.id}:attack:{target.id}") | |
| damage = npc.attack_damage + rng.randint(0, 5) | |
| target.health -= damage | |
| npc.intention = f"attacking {target.name}" | |
| npc.focus_target_id = target.id | |
| if has_hostile_intent(npc) or directive.intent == "hostile_attack": | |
| npc.mode = "hostile_pursuit" | |
| target.mode = "threatened" | |
| target.needs.safety = max(target.needs.safety, 100) | |
| target.emotions.fear = min(100, max(target.emotions.fear, 90)) | |
| target.emotions.stress = min(100, max(target.emotions.stress, 85)) | |
| target.emotions.trust_baseline = max(0, target.emotions.trust_baseline - 20) | |
| target.relationships[npc.id] = max(-1.0, target.relationships.get(npc.id, 0.0) - 0.5) | |
| add_episode( | |
| target, | |
| next_tick, | |
| "attack", | |
| npc.id, | |
| f"{npc.name} attacked me for {damage} damage", | |
| target_id=target.id, | |
| subject_kind="npc", | |
| perspective="recipient", | |
| tags=["danger"], | |
| weight=0.9, | |
| ) | |
| remember(npc, next_tick, f"You attacked {target.name} for {damage} damage.") | |
| remember(target, next_tick, f"{npc.name} attacked you for {damage} damage.") | |
| if not is_alive(target): | |
| target.intention = "dead" | |
| remember(target, next_tick, "You died.") | |
| _remember_threat_observers( | |
| actor=npc, | |
| target=target, | |
| npcs_by_id=npcs_by_id, | |
| next_tick=next_tick, | |
| text=f"{npc.name} attacked {target.name}; {npc.name} is dangerous.", | |
| include_target=False, | |
| kind="attack", | |
| tags=["danger"], | |
| ) | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=True, | |
| summary=npc.intention, | |
| ) | |
| def _apply_idle(npc: Npc, directive: NpcDirective) -> ResolutionResult: | |
| match directive.intent: | |
| case "defend": | |
| npc.intention = "defending" | |
| case "observe": | |
| npc.intention = "observing" | |
| case _: | |
| npc.intention = "idle" | |
| return ResolutionResult( | |
| npc_id=npc.id, | |
| action=directive.action, | |
| success=True, | |
| summary=npc.intention, | |
| ) | |
| def _validation_result( | |
| original: NpcDirective, | |
| resolved: NpcDirective, | |
| *, | |
| valid: bool, | |
| reason: str, | |
| ) -> ValidationResult: | |
| return ValidationResult( | |
| valid=valid, | |
| original_action=str(original.action), | |
| resolved_action=resolved.action, | |
| reason=reason, | |
| directive=resolved, | |
| ) | |
| def _bounded_target(world: WorldState, target: Vec3) -> Vec3: | |
| half_width = world.terrain.width / 2 | |
| half_depth = world.terrain.depth / 2 | |
| return Vec3( | |
| x=round(_clamp(target.x, -half_width, half_width), 3), | |
| y=0.0, | |
| z=round(_clamp(target.z, -half_depth, half_depth), 3), | |
| ) | |
| def _remember_threat_observers( | |
| *, | |
| actor: Npc, | |
| target: Npc, | |
| npcs_by_id: dict[str, Npc], | |
| next_tick: int, | |
| text: str, | |
| include_target: bool, | |
| kind: str = "observe", | |
| tags: list[str] | None = None, | |
| ) -> None: | |
| for observer in npcs_by_id.values(): | |
| if observer.id == actor.id: | |
| continue | |
| if observer.id == target.id and not include_target: | |
| continue | |
| if not is_alive(observer): | |
| continue | |
| if distance_between(observer, actor) > VISIBLE_RADIUS: | |
| continue | |
| observer.needs.safety = max(observer.needs.safety, 85) | |
| observer.emotions.fear = max(observer.emotions.fear, 70) | |
| observer.emotions.stress = max(observer.emotions.stress, 60) | |
| add_episode( | |
| observer, | |
| next_tick, | |
| kind, | |
| actor.id, | |
| text, | |
| target_id=target.id, | |
| subject_kind="npc", | |
| perspective="witness" if observer.id != target.id else "recipient", | |
| tags=tags or ["danger"], | |
| weight=0.7 if kind == "observe" else 0.8, | |
| ) | |
| remember(observer, next_tick, text) | |
| def _remember_nearby_citizens( | |
| *, | |
| actor: Npc, | |
| npcs_by_id: dict[str, Npc], | |
| next_tick: int, | |
| text: str, | |
| kind: str = "observe", | |
| tags: list[str] | None = None, | |
| ) -> None: | |
| for observer in npcs_by_id.values(): | |
| if observer.id == actor.id or not is_alive(observer): | |
| continue | |
| if distance_between(observer, actor) > VISIBLE_RADIUS: | |
| continue | |
| add_episode( | |
| observer, | |
| next_tick, | |
| kind, | |
| actor.id, | |
| text, | |
| subject_kind="npc", | |
| perspective="recipient", | |
| tags=tags or [], | |
| weight=0.5, | |
| ) | |
| remember(observer, next_tick, text) | |
| def _append_resolution_trace( | |
| traces_by_npc_id: dict[str, ActionDebugTrace], | |
| resolved_traces: list[ActionDebugTrace], | |
| directive: NpcDirective, | |
| resolution: ResolutionResult, | |
| ) -> None: | |
| trace = traces_by_npc_id.get(directive.npc_id) | |
| if trace is None: | |
| return | |
| resolved_traces.append( | |
| ActionDebugTrace( | |
| npc_id=trace.npc_id, | |
| original_directive=trace.original_directive, | |
| validation=trace.validation, | |
| resolution=resolution, | |
| ) | |
| ) | |
| def _publish_plan_ledger_entries( | |
| entries: list[dict[str, object]], | |
| debug: list[dict[str, object]], | |
| ) -> None: | |
| if not entries: | |
| return | |
| verdicts = _validator_verdicts_by_npc(debug) | |
| for entry in entries: | |
| record = dict(entry) | |
| if record.get("phase") == "npc_response": | |
| # Only a genuine model response (verdict still "pending") is | |
| # reconciled with the engine's accept/repair/reject verdict. A | |
| # response whose verdict is already "rejected" (model call failed / | |
| # output unusable / NPC omitted) keeps that explicit reason so a | |
| # fallback is never masked as "accepted" by the deterministic fill. | |
| current = record.get("validator_verdict") | |
| current_status = current.get("status") if isinstance(current, dict) else None | |
| if current_status in (None, "pending"): | |
| verdict = verdicts.get(str(record.get("npc_id") or "")) | |
| if verdict is not None: | |
| record["validator_verdict"] = verdict | |
| append_record(record) | |
| def _validator_verdicts_by_npc(debug: list[dict[str, object]]) -> dict[str, dict[str, object]]: | |
| verdicts: dict[str, dict[str, object]] = {} | |
| for trace in debug: | |
| npc_id = trace.get("npc_id") | |
| if not isinstance(npc_id, str): | |
| continue | |
| verdict = trace.get("validator_verdict") | |
| if isinstance(verdict, dict): | |
| verdicts[npc_id] = dict(verdict) | |
| continue | |
| validation = trace.get("validation") | |
| if isinstance(validation, dict): | |
| valid = validation.get("valid") | |
| original = validation.get("original_action") | |
| resolved = validation.get("resolved_action") | |
| reason = validation.get("reason") | |
| if valid is True: | |
| status = "accepted" | |
| elif original != resolved: | |
| status = "repaired" | |
| else: | |
| status = "rejected" | |
| verdicts[npc_id] = { | |
| "status": status, | |
| "requested": original, | |
| "resolved": resolved, | |
| "reason": reason, | |
| } | |
| return verdicts | |
| def _trace_to_dict(trace: ActionDebugTrace) -> dict[str, object]: | |
| return asdict(trace) | |
| def _clamp(value: float, minimum: float, maximum: float) -> float: | |
| return min(max(value, minimum), maximum) | |
| def _snap_to_block_center(value: float, *, half_width: float) -> float: | |
| first_center = -half_width + (BLOCK_SIZE / 2) | |
| last_center = half_width - (BLOCK_SIZE / 2) | |
| snapped_steps = round((value - first_center) / BLOCK_SIZE) | |
| return _clamp(first_center + (snapped_steps * BLOCK_SIZE), first_center, last_center) | |