from __future__ import annotations import math import random from copy import deepcopy from dataclasses import asdict, replace from world_simulator.domain import CitizenGoal, Npc, Vec3, WorldState from world_simulator.observability import append_record from world_simulator.simulation.actions import PRIMITIVE_ACTION_IDS from world_simulator.simulation.autopilot import ( autopilot_directive, flee_directive, idle_directive, move_toward_target_directive, target_from_directive_or_goal, ) from world_simulator.simulation.connectors.base import ( ActionDebugTrace, NpcDirective, ResolutionResult, TickPlan, ValidationResult, WorldSimulator, ) from world_simulator.simulation.connectors.deterministic import DeterministicWorldSimulator from world_simulator.simulation.directives import expire_directives from world_simulator.simulation.goals import refresh_citizen_motivation from world_simulator.simulation.mechanics import ( ATTACK_RADIUS, BLOCK_SIZE, MAX_WALK_DISTANCE, TALK_RADIUS, VISIBLE_RADIUS, distance_between, has_hostile_intent, is_alive, ) from world_simulator.simulation.memory import remember from world_simulator.simulation.overseer import OverseerController, apply_overseer_metadata from world_simulator.simulation.perception import CitizenPerception from world_simulator.simulation.survival import ( add_episode, apply_post_survival_tick, apply_survival_plan, apply_survival_tick, is_survival_world, ) def advance_world( world: WorldState, simulator: WorldSimulator | None = None, overseer: OverseerController | None = None, ) -> WorldState: """Advance world state by asking the configured simulator for a tick plan.""" planned_tick = world.tick + 1 expire_directives(world, tick=planned_tick) next_tick, plan = plan_world_tick(world, simulator, next_tick=planned_tick, overseer=overseer) apply_tick_plan(world, next_tick, plan) return world def plan_world_tick( world: WorldState, simulator: WorldSimulator | None = None, next_tick: int | None = None, overseer: OverseerController | None = None, ) -> tuple[int, TickPlan]: """Build a tick plan without mutating the supplied world state.""" planned_tick = next_tick if next_tick is not None else world.tick + 1 active_simulator = simulator or DeterministicWorldSimulator() if overseer is None: return planned_tick, active_simulator.propose_tick(world, planned_tick) prompt_world = deepcopy(world) overseer_plan = overseer.augment_plan( prompt_world, planned_tick, TickPlan(source="pre_npc", directives=[]), ) apply_overseer_metadata(prompt_world, overseer_plan.overseer) npc_plan = active_simulator.propose_tick(prompt_world, planned_tick) merged = {directive.npc_id: directive for directive in npc_plan.directives} for directive in overseer_plan.directives: merged[directive.npc_id] = directive return ( planned_tick, TickPlan( source=f"{npc_plan.source}+overseer" if overseer_plan.overseer else npc_plan.source, directives=list(merged.values()), overseer=overseer_plan.overseer, ledger_entries=[*overseer_plan.ledger_entries, *npc_plan.ledger_entries], ), ) def apply_tick_plan(world: WorldState, next_tick: int, plan: TickPlan) -> None: """Commit an already computed tick plan into the live world state.""" # Deterministic survival environment (hunger, beast, fear) runs first and is # a no-op for worlds without survival entities. apply_survival_tick(world) # In a survival world all directives (deterministic or LLM) resolve through # the survival resolver so the engine owns hunger/health/position/inventory. if plan.source.startswith("survival") or is_survival_world(world): world.tick = next_tick apply_overseer_metadata(world, plan.overseer) debug = apply_survival_plan(world, next_tick, plan) _publish_plan_ledger_entries(plan.ledger_entries, debug) apply_post_survival_tick(world, next_tick) world.last_tick_source = plan.source world.last_action_debug = debug return plan, traces = validate_tick_plan(world, plan) traces_by_npc_id = {trace.npc_id: trace for trace in traces} resolved_traces: list[ActionDebugTrace] = [] half_width = world.terrain.width / 2 half_depth = world.terrain.depth / 2 npcs_by_id = {npc.id: npc for npc in world.npcs} move_targets = _resolve_move_targets( world, plan.directives, half_width=half_width, half_depth=half_depth, ) for directive in plan.directives: npc = npcs_by_id.get(directive.npc_id) if not npc: continue if not is_alive(npc): npc.intention = "dead" resolution = ResolutionResult( npc_id=directive.npc_id, action=directive.action, success=False, summary="actor dead", ) _append_resolution_trace(traces_by_npc_id, resolved_traces, directive, resolution) continue npc.focus_target_id = None match directive.action: case "move": resolution = _apply_move( npc, directive, move_targets.get(npc.id) if directive.target else None, npcs_by_id, next_tick, ) case "speak": resolution = _apply_speak(npc, directive, npcs_by_id, next_tick) case "attack" | "strike": resolution = _apply_strike(npc, directive, npcs_by_id, world, next_tick) case "idle": resolution = _apply_idle(npc, directive) case _: resolution = ResolutionResult( npc_id=directive.npc_id, action=directive.action, success=False, summary=f"unresolved action {directive.action}", ) if directive.memory: remember(npc, next_tick, directive.memory) if directive.conversation_user and directive.conversation_assistant: npc.conversation_context.add_turn( directive.conversation_user, directive.conversation_assistant, ) _append_resolution_trace(traces_by_npc_id, resolved_traces, directive, resolution) for npc in world.npcs: if not is_alive(npc): npc.intention = "dead" world.tick = next_tick apply_overseer_metadata(world, plan.overseer) world.last_tick_source = plan.source world.last_action_debug = [_trace_to_dict(trace) for trace in resolved_traces] _publish_plan_ledger_entries(plan.ledger_entries, world.last_action_debug) def validate_tick_plan( world: WorldState, plan: TickPlan, ) -> tuple[TickPlan, list[ActionDebugTrace]]: """Resolve requested NPC directives into safe engine directives.""" npcs_by_id = {npc.id: npc for npc in world.npcs} resolved_directives: list[NpcDirective] = [] traces: list[ActionDebugTrace] = [] for directive in plan.directives: npc = npcs_by_id.get(directive.npc_id) validation = validate_directive(world, npc, directive) traces.append( ActionDebugTrace( npc_id=directive.npc_id, original_directive=directive, validation=validation, ) ) resolved_directives.append(validation.directive) return ( TickPlan( source=plan.source, directives=resolved_directives, overseer=plan.overseer, ledger_entries=plan.ledger_entries, ), traces, ) def validate_directive( world: WorldState, npc: Npc | None, directive: NpcDirective, ) -> ValidationResult: """Validate or repair one directive before engine execution. Only physical impossibility is repaired here (missing or dead targets, out-of-range interactions, unknown actions). Behavioral plausibility is the planner's job: any physically possible primitive is executed as asked. """ if npc is None or npc.id != directive.npc_id: resolved = idle_directive( directive, memory="Action ignored because the actor was unavailable.", intent="invalid_actor", ) return _validation_result(directive, resolved, valid=False, reason="actor unavailable") if not is_alive(npc): resolved = idle_directive(directive, memory=None, intent="actor_dead") return _validation_result(directive, resolved, valid=False, reason="actor dead") original_directive = directive if directive.action == "idle": return _validation_result(directive, directive, valid=True, reason="idle accepted") if directive.action == "strike": directive = replace(directive, action="attack") perception, goal = refresh_citizen_motivation(world, npc) if directive.action not in PRIMITIVE_ACTION_IDS: fallback = autopilot_directive(world, npc, directive, perception, goal) return _validation_result( original_directive, fallback, valid=False, reason=f"unknown action {directive.action!r} replaced by autopilot", ) match directive.action: case "move": return _validate_move(world, npc, directive, perception, goal) case "speak": return _validate_speak(world, npc, directive) case "attack" | "strike": validation = _validate_strike(world, npc, directive) if original_directive.action != directive.action: return replace(validation, original_action=str(original_directive.action)) return validation case _: # use/transfer have no effects outside survival worlds. resolved = idle_directive( directive, memory="There was nothing here to use or trade.", intent="observe", ) return _validation_result( directive, resolved, valid=False, reason=f"{directive.action} has no effect outside survival worlds", ) def _validate_move( world: WorldState, npc: Npc, directive: NpcDirective, perception: CitizenPerception, goal: CitizenGoal | None, ) -> ValidationResult: if directive.away: threat = target_from_directive_or_goal(world, npc, directive, goal, perception) if threat is None: resolved = idle_directive( directive, memory="You braced because no visible threat was left to flee from.", intent="defend", ) return _validation_result( directive, resolved, valid=False, reason="move away had no resolvable threat", ) return _validation_result( directive, flee_directive(world, npc, threat, directive), valid=True, reason="move away resolved against nearest threat", ) if directive.target_npc_id: target = _npc_by_id(world, directive.target_npc_id) if target is None or target.id == npc.id or not is_alive(target): resolved = idle_directive( directive, memory="You could not move toward the target because it was unavailable.", intent="invalid_move_target", ) return _validation_result( directive, resolved, valid=False, reason="move target unavailable", ) return _validation_result( directive, move_toward_target_directive( npc, directive, target, intent=_move_intent(goal, target), ), valid=True, reason="move toward NPC resolved to their position", ) if directive.target is None: resolved = idle_directive(directive, memory=None, intent="missing_move_target") return _validation_result(directive, resolved, valid=False, reason="move target missing") resolved = replace(directive, target=_bounded_target(world, directive.target)) return _validation_result(directive, resolved, valid=True, reason="move accepted") def _validate_speak(world: WorldState, npc: Npc, directive: NpcDirective) -> ValidationResult: if not directive.target_npc_id: # An untargeted speak is a shout heard by everyone within earshot. return _validation_result( directive, directive, valid=True, reason="broadcast speech accepted", ) target = _npc_by_id(world, directive.target_npc_id) if target is None or target.id == npc.id or not is_alive(target): resolved = idle_directive( directive, memory="You could not speak because the listener was unavailable.", intent="invalid_speak_target", ) return _validation_result( directive, resolved, valid=False, reason="speak target unavailable", ) if distance_between(npc, target) > TALK_RADIUS: resolved = NpcDirective( npc_id=npc.id, action="move", target=target.position, target_npc_id=target.id, message=directive.message, memory=f"You moved closer to speak with {target.name}.", intent="approach_target_for_talk", confidence=directive.confidence, ) return _validation_result( directive, resolved, valid=True, reason="out-of-range speech converted to approach movement", ) return _validation_result(directive, directive, valid=True, reason="speak accepted") def _validate_strike(world: WorldState, npc: Npc, directive: NpcDirective) -> ValidationResult: target = _npc_by_id(world, directive.target_npc_id or "") if target is None or target.id == npc.id or not is_alive(target): resolved = idle_directive( directive, memory="You could not strike because the target was unavailable.", intent="invalid_strike_target", ) return _validation_result( directive, resolved, valid=False, reason="strike target unavailable", ) if distance_between(npc, target) > ATTACK_RADIUS: resolved = NpcDirective( npc_id=npc.id, action="move", target=target.position, target_npc_id=target.id, message=directive.message, memory=f"You moved toward {target.name} to get within striking range.", intent="approach_target_for_attack", confidence=directive.confidence, ) return _validation_result( directive, resolved, valid=True, reason="out-of-range strike converted to approach movement", ) return _validation_result(directive, directive, valid=True, reason="strike accepted") def _move_intent(goal: CitizenGoal | None, target: Npc) -> str: if ( goal is not None and goal.goal_type in ("pursue_and_attack_target", "retaliate") and goal.target_npc_id == target.id ): return "approach_target_for_attack" return "move_to_target" def _npc_by_id(world: WorldState, npc_id: str) -> Npc | None: return next((candidate for candidate in world.living_npcs() if candidate.id == npc_id), None) def _resolve_move_targets( world: WorldState, directives: list[NpcDirective], *, half_width: float, half_depth: float, ) -> dict[str, Vec3]: npcs_by_id = {npc.id: npc for npc in world.npcs} targets: dict[str, Vec3] = {} for directive in directives: if directive.action != "move" or not directive.target: continue npc = npcs_by_id.get(directive.npc_id) if not npc or not is_alive(npc): continue targets[npc.id] = _planned_move_target( npc, directive.target, half_width=half_width, half_depth=half_depth, ) return targets def _planned_move_target( npc: Npc, raw_target: Vec3, *, half_width: float, half_depth: float, ) -> Vec3: target_x = _snap_to_block_center(raw_target.x, half_width=half_width) target_z = _snap_to_block_center(raw_target.z, half_width=half_depth) delta_x = target_x - npc.position.x delta_z = target_z - npc.position.z distance = math.hypot(delta_x, delta_z) if distance > MAX_WALK_DISTANCE: scale = MAX_WALK_DISTANCE / distance target_x = npc.position.x + delta_x * scale target_z = npc.position.z + delta_z * scale return Vec3( x=round(_clamp(target_x, -half_width, half_width), 3), y=0.0, z=round(_clamp(target_z, -half_depth, half_depth), 3), ) def _apply_move( npc: Npc, directive: NpcDirective, target: Vec3 | None, npcs_by_id: dict[str, Npc], next_tick: int, ) -> ResolutionResult: if target is None: npc.intention = "idle" return ResolutionResult( npc_id=npc.id, action=directive.action, success=False, summary="idle", ) npc.position = target target_npc = npcs_by_id.get(directive.target_npc_id or "") match directive.intent: case "approach_target_for_attack": npc.mode = "hostile_pursuit" if target_npc: npc.intention = f"approaching {target_npc.name} to attack" else: npc.intention = "approaching to attack" if target_npc: _remember_threat_observers( actor=npc, target=target_npc, npcs_by_id=npcs_by_id, next_tick=next_tick, text=f"{npc.name} is moving toward {target_npc.name} with hostile intent.", include_target=True, ) case "approach_target_for_talk": npc.intention = ( f"approaching {target_npc.name} to talk" if target_npc else "approaching to talk" ) case "flee_from_threat": npc.mode = "fleeing" npc.intention = f"fleeing from {target_npc.name}" if target_npc else "fleeing" case _: npc.intention = "walking" return ResolutionResult( npc_id=npc.id, action=directive.action, success=True, summary=npc.intention, ) def _apply_speak( npc: Npc, directive: NpcDirective, npcs_by_id: dict[str, Npc], next_tick: int, ) -> ResolutionResult: if not directive.target_npc_id: return _apply_broadcast_speech(npc, directive, npcs_by_id, next_tick) target = npcs_by_id.get(directive.target_npc_id) if not target or target.id == npc.id or not is_alive(target): npc.intention = "could not speak" remember(npc, next_tick, "You could not speak because the listener was unavailable.") return ResolutionResult( npc_id=npc.id, action=directive.action, success=False, summary=npc.intention, ) if distance_between(npc, target) > TALK_RADIUS: npc.intention = f"too far to speak with {target.name}" remember(npc, next_tick, f"You were too far away to speak with {target.name}.") return ResolutionResult( npc_id=npc.id, action=directive.action, success=False, summary=npc.intention, ) message = directive.message or "checking in" npc.intention = f"talking to {target.name}" npc.focus_target_id = target.id target.focus_target_id = npc.id add_episode( npc, next_tick, "communicate", npc.id, f"I told {target.name}: {message}", target_id=target.id, subject_kind="npc", perspective="self", tags=["social"], weight=0.3, ) add_episode( target, next_tick, "communicate", npc.id, f"{npc.name} told me: {message}", target_id=target.id, subject_kind="npc", perspective="recipient", tags=["social"], weight=0.3, ) remember(npc, next_tick, f"You told {target.name}: {message}") remember(target, next_tick, f"{npc.name} told you: {message}") return ResolutionResult( npc_id=npc.id, action=directive.action, success=True, summary=npc.intention, ) def _apply_broadcast_speech( npc: Npc, directive: NpcDirective, npcs_by_id: dict[str, Npc], next_tick: int, ) -> ResolutionResult: is_help_request = directive.communication_intent == "help_request" message = directive.message or ("Help! I am in danger!" if is_help_request else "Hey!") npc.intention = "calling for help" if is_help_request else "shouting" tags = ["help", "danger"] if is_help_request else ["social"] add_episode( npc, next_tick, "communicate", npc.id, f"I shouted: {message}", subject_kind="npc", perspective="self", tags=tags, weight=0.6 if is_help_request else 0.3, ) _remember_nearby_citizens( actor=npc, npcs_by_id=npcs_by_id, next_tick=next_tick, text=f"{npc.name} shouted: {message}", kind="communicate", tags=tags, ) remember(npc, next_tick, f"You shouted: {message}") return ResolutionResult( npc_id=npc.id, action=directive.action, success=True, summary=npc.intention, ) def _apply_strike( npc: Npc, directive: NpcDirective, npcs_by_id: dict[str, Npc], world: WorldState, next_tick: int, ) -> ResolutionResult: target = npcs_by_id.get(directive.target_npc_id or "") if not target or target.id == npc.id or not is_alive(target): npc.intention = "could not attack" remember(npc, next_tick, "You could not attack because the target was unavailable.") return ResolutionResult( npc_id=npc.id, action=directive.action, success=False, summary=npc.intention, ) if distance_between(npc, target) > ATTACK_RADIUS: npc.intention = f"too far to attack {target.name}" remember(npc, next_tick, f"You were too far away to attack {target.name}.") return ResolutionResult( npc_id=npc.id, action=directive.action, success=False, summary=npc.intention, ) rng = random.Random(f"{world.seed}:{next_tick}:{npc.id}:attack:{target.id}") damage = npc.attack_damage + rng.randint(0, 5) target.health -= damage npc.intention = f"attacking {target.name}" npc.focus_target_id = target.id if has_hostile_intent(npc) or directive.intent == "hostile_attack": npc.mode = "hostile_pursuit" target.mode = "threatened" target.needs.safety = max(target.needs.safety, 100) target.emotions.fear = min(100, max(target.emotions.fear, 90)) target.emotions.stress = min(100, max(target.emotions.stress, 85)) target.emotions.trust_baseline = max(0, target.emotions.trust_baseline - 20) target.relationships[npc.id] = max(-1.0, target.relationships.get(npc.id, 0.0) - 0.5) add_episode( target, next_tick, "attack", npc.id, f"{npc.name} attacked me for {damage} damage", target_id=target.id, subject_kind="npc", perspective="recipient", tags=["danger"], weight=0.9, ) remember(npc, next_tick, f"You attacked {target.name} for {damage} damage.") remember(target, next_tick, f"{npc.name} attacked you for {damage} damage.") if not is_alive(target): target.intention = "dead" remember(target, next_tick, "You died.") _remember_threat_observers( actor=npc, target=target, npcs_by_id=npcs_by_id, next_tick=next_tick, text=f"{npc.name} attacked {target.name}; {npc.name} is dangerous.", include_target=False, kind="attack", tags=["danger"], ) return ResolutionResult( npc_id=npc.id, action=directive.action, success=True, summary=npc.intention, ) def _apply_idle(npc: Npc, directive: NpcDirective) -> ResolutionResult: match directive.intent: case "defend": npc.intention = "defending" case "observe": npc.intention = "observing" case _: npc.intention = "idle" return ResolutionResult( npc_id=npc.id, action=directive.action, success=True, summary=npc.intention, ) def _validation_result( original: NpcDirective, resolved: NpcDirective, *, valid: bool, reason: str, ) -> ValidationResult: return ValidationResult( valid=valid, original_action=str(original.action), resolved_action=resolved.action, reason=reason, directive=resolved, ) def _bounded_target(world: WorldState, target: Vec3) -> Vec3: half_width = world.terrain.width / 2 half_depth = world.terrain.depth / 2 return Vec3( x=round(_clamp(target.x, -half_width, half_width), 3), y=0.0, z=round(_clamp(target.z, -half_depth, half_depth), 3), ) def _remember_threat_observers( *, actor: Npc, target: Npc, npcs_by_id: dict[str, Npc], next_tick: int, text: str, include_target: bool, kind: str = "observe", tags: list[str] | None = None, ) -> None: for observer in npcs_by_id.values(): if observer.id == actor.id: continue if observer.id == target.id and not include_target: continue if not is_alive(observer): continue if distance_between(observer, actor) > VISIBLE_RADIUS: continue observer.needs.safety = max(observer.needs.safety, 85) observer.emotions.fear = max(observer.emotions.fear, 70) observer.emotions.stress = max(observer.emotions.stress, 60) add_episode( observer, next_tick, kind, actor.id, text, target_id=target.id, subject_kind="npc", perspective="witness" if observer.id != target.id else "recipient", tags=tags or ["danger"], weight=0.7 if kind == "observe" else 0.8, ) remember(observer, next_tick, text) def _remember_nearby_citizens( *, actor: Npc, npcs_by_id: dict[str, Npc], next_tick: int, text: str, kind: str = "observe", tags: list[str] | None = None, ) -> None: for observer in npcs_by_id.values(): if observer.id == actor.id or not is_alive(observer): continue if distance_between(observer, actor) > VISIBLE_RADIUS: continue add_episode( observer, next_tick, kind, actor.id, text, subject_kind="npc", perspective="recipient", tags=tags or [], weight=0.5, ) remember(observer, next_tick, text) def _append_resolution_trace( traces_by_npc_id: dict[str, ActionDebugTrace], resolved_traces: list[ActionDebugTrace], directive: NpcDirective, resolution: ResolutionResult, ) -> None: trace = traces_by_npc_id.get(directive.npc_id) if trace is None: return resolved_traces.append( ActionDebugTrace( npc_id=trace.npc_id, original_directive=trace.original_directive, validation=trace.validation, resolution=resolution, ) ) def _publish_plan_ledger_entries( entries: list[dict[str, object]], debug: list[dict[str, object]], ) -> None: if not entries: return verdicts = _validator_verdicts_by_npc(debug) for entry in entries: record = dict(entry) if record.get("phase") == "npc_response": # Only a genuine model response (verdict still "pending") is # reconciled with the engine's accept/repair/reject verdict. A # response whose verdict is already "rejected" (model call failed / # output unusable / NPC omitted) keeps that explicit reason so a # fallback is never masked as "accepted" by the deterministic fill. current = record.get("validator_verdict") current_status = current.get("status") if isinstance(current, dict) else None if current_status in (None, "pending"): verdict = verdicts.get(str(record.get("npc_id") or "")) if verdict is not None: record["validator_verdict"] = verdict append_record(record) def _validator_verdicts_by_npc(debug: list[dict[str, object]]) -> dict[str, dict[str, object]]: verdicts: dict[str, dict[str, object]] = {} for trace in debug: npc_id = trace.get("npc_id") if not isinstance(npc_id, str): continue verdict = trace.get("validator_verdict") if isinstance(verdict, dict): verdicts[npc_id] = dict(verdict) continue validation = trace.get("validation") if isinstance(validation, dict): valid = validation.get("valid") original = validation.get("original_action") resolved = validation.get("resolved_action") reason = validation.get("reason") if valid is True: status = "accepted" elif original != resolved: status = "repaired" else: status = "rejected" verdicts[npc_id] = { "status": status, "requested": original, "resolved": resolved, "reason": reason, } return verdicts def _trace_to_dict(trace: ActionDebugTrace) -> dict[str, object]: return asdict(trace) def _clamp(value: float, minimum: float, maximum: float) -> float: return min(max(value, minimum), maximum) def _snap_to_block_center(value: float, *, half_width: float) -> float: first_center = -half_width + (BLOCK_SIZE / 2) last_center = half_width - (BLOCK_SIZE / 2) snapped_steps = round((value - first_center) / BLOCK_SIZE) return _clamp(first_center + (snapped_steps * BLOCK_SIZE), first_center, last_center)