DeltaZN
feat: rename god -> world
c58d3eb
Raw
History Blame Contribute Delete
31.2 kB
from __future__ import annotations
import math
import random
from copy import deepcopy
from dataclasses import asdict, replace
from world_simulator.domain import CitizenGoal, Npc, Vec3, WorldState
from world_simulator.observability import append_record
from world_simulator.simulation.actions import PRIMITIVE_ACTION_IDS
from world_simulator.simulation.autopilot import (
autopilot_directive,
flee_directive,
idle_directive,
move_toward_target_directive,
target_from_directive_or_goal,
)
from world_simulator.simulation.connectors.base import (
ActionDebugTrace,
NpcDirective,
ResolutionResult,
TickPlan,
ValidationResult,
WorldSimulator,
)
from world_simulator.simulation.connectors.deterministic import DeterministicWorldSimulator
from world_simulator.simulation.directives import expire_directives
from world_simulator.simulation.goals import refresh_citizen_motivation
from world_simulator.simulation.mechanics import (
ATTACK_RADIUS,
BLOCK_SIZE,
MAX_WALK_DISTANCE,
TALK_RADIUS,
VISIBLE_RADIUS,
distance_between,
has_hostile_intent,
is_alive,
)
from world_simulator.simulation.memory import remember
from world_simulator.simulation.overseer import OverseerController, apply_overseer_metadata
from world_simulator.simulation.perception import CitizenPerception
from world_simulator.simulation.survival import (
add_episode,
apply_post_survival_tick,
apply_survival_plan,
apply_survival_tick,
is_survival_world,
)
def advance_world(
world: WorldState,
simulator: WorldSimulator | None = None,
overseer: OverseerController | None = None,
) -> WorldState:
"""Advance world state by asking the configured simulator for a tick plan."""
planned_tick = world.tick + 1
expire_directives(world, tick=planned_tick)
next_tick, plan = plan_world_tick(world, simulator, next_tick=planned_tick, overseer=overseer)
apply_tick_plan(world, next_tick, plan)
return world
def plan_world_tick(
world: WorldState,
simulator: WorldSimulator | None = None,
next_tick: int | None = None,
overseer: OverseerController | None = None,
) -> tuple[int, TickPlan]:
"""Build a tick plan without mutating the supplied world state."""
planned_tick = next_tick if next_tick is not None else world.tick + 1
active_simulator = simulator or DeterministicWorldSimulator()
if overseer is None:
return planned_tick, active_simulator.propose_tick(world, planned_tick)
prompt_world = deepcopy(world)
overseer_plan = overseer.augment_plan(
prompt_world,
planned_tick,
TickPlan(source="pre_npc", directives=[]),
)
apply_overseer_metadata(prompt_world, overseer_plan.overseer)
npc_plan = active_simulator.propose_tick(prompt_world, planned_tick)
merged = {directive.npc_id: directive for directive in npc_plan.directives}
for directive in overseer_plan.directives:
merged[directive.npc_id] = directive
return (
planned_tick,
TickPlan(
source=f"{npc_plan.source}+overseer" if overseer_plan.overseer else npc_plan.source,
directives=list(merged.values()),
overseer=overseer_plan.overseer,
ledger_entries=[*overseer_plan.ledger_entries, *npc_plan.ledger_entries],
),
)
def apply_tick_plan(world: WorldState, next_tick: int, plan: TickPlan) -> None:
"""Commit an already computed tick plan into the live world state."""
# Deterministic survival environment (hunger, beast, fear) runs first and is
# a no-op for worlds without survival entities.
apply_survival_tick(world)
# In a survival world all directives (deterministic or LLM) resolve through
# the survival resolver so the engine owns hunger/health/position/inventory.
if plan.source.startswith("survival") or is_survival_world(world):
world.tick = next_tick
apply_overseer_metadata(world, plan.overseer)
debug = apply_survival_plan(world, next_tick, plan)
_publish_plan_ledger_entries(plan.ledger_entries, debug)
apply_post_survival_tick(world, next_tick)
world.last_tick_source = plan.source
world.last_action_debug = debug
return
plan, traces = validate_tick_plan(world, plan)
traces_by_npc_id = {trace.npc_id: trace for trace in traces}
resolved_traces: list[ActionDebugTrace] = []
half_width = world.terrain.width / 2
half_depth = world.terrain.depth / 2
npcs_by_id = {npc.id: npc for npc in world.npcs}
move_targets = _resolve_move_targets(
world,
plan.directives,
half_width=half_width,
half_depth=half_depth,
)
for directive in plan.directives:
npc = npcs_by_id.get(directive.npc_id)
if not npc:
continue
if not is_alive(npc):
npc.intention = "dead"
resolution = ResolutionResult(
npc_id=directive.npc_id,
action=directive.action,
success=False,
summary="actor dead",
)
_append_resolution_trace(traces_by_npc_id, resolved_traces, directive, resolution)
continue
npc.focus_target_id = None
match directive.action:
case "move":
resolution = _apply_move(
npc,
directive,
move_targets.get(npc.id) if directive.target else None,
npcs_by_id,
next_tick,
)
case "speak":
resolution = _apply_speak(npc, directive, npcs_by_id, next_tick)
case "attack" | "strike":
resolution = _apply_strike(npc, directive, npcs_by_id, world, next_tick)
case "idle":
resolution = _apply_idle(npc, directive)
case _:
resolution = ResolutionResult(
npc_id=directive.npc_id,
action=directive.action,
success=False,
summary=f"unresolved action {directive.action}",
)
if directive.memory:
remember(npc, next_tick, directive.memory)
if directive.conversation_user and directive.conversation_assistant:
npc.conversation_context.add_turn(
directive.conversation_user,
directive.conversation_assistant,
)
_append_resolution_trace(traces_by_npc_id, resolved_traces, directive, resolution)
for npc in world.npcs:
if not is_alive(npc):
npc.intention = "dead"
world.tick = next_tick
apply_overseer_metadata(world, plan.overseer)
world.last_tick_source = plan.source
world.last_action_debug = [_trace_to_dict(trace) for trace in resolved_traces]
_publish_plan_ledger_entries(plan.ledger_entries, world.last_action_debug)
def validate_tick_plan(
world: WorldState,
plan: TickPlan,
) -> tuple[TickPlan, list[ActionDebugTrace]]:
"""Resolve requested NPC directives into safe engine directives."""
npcs_by_id = {npc.id: npc for npc in world.npcs}
resolved_directives: list[NpcDirective] = []
traces: list[ActionDebugTrace] = []
for directive in plan.directives:
npc = npcs_by_id.get(directive.npc_id)
validation = validate_directive(world, npc, directive)
traces.append(
ActionDebugTrace(
npc_id=directive.npc_id,
original_directive=directive,
validation=validation,
)
)
resolved_directives.append(validation.directive)
return (
TickPlan(
source=plan.source,
directives=resolved_directives,
overseer=plan.overseer,
ledger_entries=plan.ledger_entries,
),
traces,
)
def validate_directive(
world: WorldState,
npc: Npc | None,
directive: NpcDirective,
) -> ValidationResult:
"""Validate or repair one directive before engine execution.
Only physical impossibility is repaired here (missing or dead targets,
out-of-range interactions, unknown actions). Behavioral plausibility is
the planner's job: any physically possible primitive is executed as asked.
"""
if npc is None or npc.id != directive.npc_id:
resolved = idle_directive(
directive,
memory="Action ignored because the actor was unavailable.",
intent="invalid_actor",
)
return _validation_result(directive, resolved, valid=False, reason="actor unavailable")
if not is_alive(npc):
resolved = idle_directive(directive, memory=None, intent="actor_dead")
return _validation_result(directive, resolved, valid=False, reason="actor dead")
original_directive = directive
if directive.action == "idle":
return _validation_result(directive, directive, valid=True, reason="idle accepted")
if directive.action == "strike":
directive = replace(directive, action="attack")
perception, goal = refresh_citizen_motivation(world, npc)
if directive.action not in PRIMITIVE_ACTION_IDS:
fallback = autopilot_directive(world, npc, directive, perception, goal)
return _validation_result(
original_directive,
fallback,
valid=False,
reason=f"unknown action {directive.action!r} replaced by autopilot",
)
match directive.action:
case "move":
return _validate_move(world, npc, directive, perception, goal)
case "speak":
return _validate_speak(world, npc, directive)
case "attack" | "strike":
validation = _validate_strike(world, npc, directive)
if original_directive.action != directive.action:
return replace(validation, original_action=str(original_directive.action))
return validation
case _:
# use/transfer have no effects outside survival worlds.
resolved = idle_directive(
directive,
memory="There was nothing here to use or trade.",
intent="observe",
)
return _validation_result(
directive,
resolved,
valid=False,
reason=f"{directive.action} has no effect outside survival worlds",
)
def _validate_move(
world: WorldState,
npc: Npc,
directive: NpcDirective,
perception: CitizenPerception,
goal: CitizenGoal | None,
) -> ValidationResult:
if directive.away:
threat = target_from_directive_or_goal(world, npc, directive, goal, perception)
if threat is None:
resolved = idle_directive(
directive,
memory="You braced because no visible threat was left to flee from.",
intent="defend",
)
return _validation_result(
directive,
resolved,
valid=False,
reason="move away had no resolvable threat",
)
return _validation_result(
directive,
flee_directive(world, npc, threat, directive),
valid=True,
reason="move away resolved against nearest threat",
)
if directive.target_npc_id:
target = _npc_by_id(world, directive.target_npc_id)
if target is None or target.id == npc.id or not is_alive(target):
resolved = idle_directive(
directive,
memory="You could not move toward the target because it was unavailable.",
intent="invalid_move_target",
)
return _validation_result(
directive,
resolved,
valid=False,
reason="move target unavailable",
)
return _validation_result(
directive,
move_toward_target_directive(
npc,
directive,
target,
intent=_move_intent(goal, target),
),
valid=True,
reason="move toward NPC resolved to their position",
)
if directive.target is None:
resolved = idle_directive(directive, memory=None, intent="missing_move_target")
return _validation_result(directive, resolved, valid=False, reason="move target missing")
resolved = replace(directive, target=_bounded_target(world, directive.target))
return _validation_result(directive, resolved, valid=True, reason="move accepted")
def _validate_speak(world: WorldState, npc: Npc, directive: NpcDirective) -> ValidationResult:
if not directive.target_npc_id:
# An untargeted speak is a shout heard by everyone within earshot.
return _validation_result(
directive,
directive,
valid=True,
reason="broadcast speech accepted",
)
target = _npc_by_id(world, directive.target_npc_id)
if target is None or target.id == npc.id or not is_alive(target):
resolved = idle_directive(
directive,
memory="You could not speak because the listener was unavailable.",
intent="invalid_speak_target",
)
return _validation_result(
directive,
resolved,
valid=False,
reason="speak target unavailable",
)
if distance_between(npc, target) > TALK_RADIUS:
resolved = NpcDirective(
npc_id=npc.id,
action="move",
target=target.position,
target_npc_id=target.id,
message=directive.message,
memory=f"You moved closer to speak with {target.name}.",
intent="approach_target_for_talk",
confidence=directive.confidence,
)
return _validation_result(
directive,
resolved,
valid=True,
reason="out-of-range speech converted to approach movement",
)
return _validation_result(directive, directive, valid=True, reason="speak accepted")
def _validate_strike(world: WorldState, npc: Npc, directive: NpcDirective) -> ValidationResult:
target = _npc_by_id(world, directive.target_npc_id or "")
if target is None or target.id == npc.id or not is_alive(target):
resolved = idle_directive(
directive,
memory="You could not strike because the target was unavailable.",
intent="invalid_strike_target",
)
return _validation_result(
directive,
resolved,
valid=False,
reason="strike target unavailable",
)
if distance_between(npc, target) > ATTACK_RADIUS:
resolved = NpcDirective(
npc_id=npc.id,
action="move",
target=target.position,
target_npc_id=target.id,
message=directive.message,
memory=f"You moved toward {target.name} to get within striking range.",
intent="approach_target_for_attack",
confidence=directive.confidence,
)
return _validation_result(
directive,
resolved,
valid=True,
reason="out-of-range strike converted to approach movement",
)
return _validation_result(directive, directive, valid=True, reason="strike accepted")
def _move_intent(goal: CitizenGoal | None, target: Npc) -> str:
if (
goal is not None
and goal.goal_type in ("pursue_and_attack_target", "retaliate")
and goal.target_npc_id == target.id
):
return "approach_target_for_attack"
return "move_to_target"
def _npc_by_id(world: WorldState, npc_id: str) -> Npc | None:
return next((candidate for candidate in world.living_npcs() if candidate.id == npc_id), None)
def _resolve_move_targets(
world: WorldState,
directives: list[NpcDirective],
*,
half_width: float,
half_depth: float,
) -> dict[str, Vec3]:
npcs_by_id = {npc.id: npc for npc in world.npcs}
targets: dict[str, Vec3] = {}
for directive in directives:
if directive.action != "move" or not directive.target:
continue
npc = npcs_by_id.get(directive.npc_id)
if not npc or not is_alive(npc):
continue
targets[npc.id] = _planned_move_target(
npc,
directive.target,
half_width=half_width,
half_depth=half_depth,
)
return targets
def _planned_move_target(
npc: Npc,
raw_target: Vec3,
*,
half_width: float,
half_depth: float,
) -> Vec3:
target_x = _snap_to_block_center(raw_target.x, half_width=half_width)
target_z = _snap_to_block_center(raw_target.z, half_width=half_depth)
delta_x = target_x - npc.position.x
delta_z = target_z - npc.position.z
distance = math.hypot(delta_x, delta_z)
if distance > MAX_WALK_DISTANCE:
scale = MAX_WALK_DISTANCE / distance
target_x = npc.position.x + delta_x * scale
target_z = npc.position.z + delta_z * scale
return Vec3(
x=round(_clamp(target_x, -half_width, half_width), 3),
y=0.0,
z=round(_clamp(target_z, -half_depth, half_depth), 3),
)
def _apply_move(
npc: Npc,
directive: NpcDirective,
target: Vec3 | None,
npcs_by_id: dict[str, Npc],
next_tick: int,
) -> ResolutionResult:
if target is None:
npc.intention = "idle"
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=False,
summary="idle",
)
npc.position = target
target_npc = npcs_by_id.get(directive.target_npc_id or "")
match directive.intent:
case "approach_target_for_attack":
npc.mode = "hostile_pursuit"
if target_npc:
npc.intention = f"approaching {target_npc.name} to attack"
else:
npc.intention = "approaching to attack"
if target_npc:
_remember_threat_observers(
actor=npc,
target=target_npc,
npcs_by_id=npcs_by_id,
next_tick=next_tick,
text=f"{npc.name} is moving toward {target_npc.name} with hostile intent.",
include_target=True,
)
case "approach_target_for_talk":
npc.intention = (
f"approaching {target_npc.name} to talk" if target_npc else "approaching to talk"
)
case "flee_from_threat":
npc.mode = "fleeing"
npc.intention = f"fleeing from {target_npc.name}" if target_npc else "fleeing"
case _:
npc.intention = "walking"
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=True,
summary=npc.intention,
)
def _apply_speak(
npc: Npc,
directive: NpcDirective,
npcs_by_id: dict[str, Npc],
next_tick: int,
) -> ResolutionResult:
if not directive.target_npc_id:
return _apply_broadcast_speech(npc, directive, npcs_by_id, next_tick)
target = npcs_by_id.get(directive.target_npc_id)
if not target or target.id == npc.id or not is_alive(target):
npc.intention = "could not speak"
remember(npc, next_tick, "You could not speak because the listener was unavailable.")
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=False,
summary=npc.intention,
)
if distance_between(npc, target) > TALK_RADIUS:
npc.intention = f"too far to speak with {target.name}"
remember(npc, next_tick, f"You were too far away to speak with {target.name}.")
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=False,
summary=npc.intention,
)
message = directive.message or "checking in"
npc.intention = f"talking to {target.name}"
npc.focus_target_id = target.id
target.focus_target_id = npc.id
add_episode(
npc,
next_tick,
"communicate",
npc.id,
f"I told {target.name}: {message}",
target_id=target.id,
subject_kind="npc",
perspective="self",
tags=["social"],
weight=0.3,
)
add_episode(
target,
next_tick,
"communicate",
npc.id,
f"{npc.name} told me: {message}",
target_id=target.id,
subject_kind="npc",
perspective="recipient",
tags=["social"],
weight=0.3,
)
remember(npc, next_tick, f"You told {target.name}: {message}")
remember(target, next_tick, f"{npc.name} told you: {message}")
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=True,
summary=npc.intention,
)
def _apply_broadcast_speech(
npc: Npc,
directive: NpcDirective,
npcs_by_id: dict[str, Npc],
next_tick: int,
) -> ResolutionResult:
is_help_request = directive.communication_intent == "help_request"
message = directive.message or ("Help! I am in danger!" if is_help_request else "Hey!")
npc.intention = "calling for help" if is_help_request else "shouting"
tags = ["help", "danger"] if is_help_request else ["social"]
add_episode(
npc,
next_tick,
"communicate",
npc.id,
f"I shouted: {message}",
subject_kind="npc",
perspective="self",
tags=tags,
weight=0.6 if is_help_request else 0.3,
)
_remember_nearby_citizens(
actor=npc,
npcs_by_id=npcs_by_id,
next_tick=next_tick,
text=f"{npc.name} shouted: {message}",
kind="communicate",
tags=tags,
)
remember(npc, next_tick, f"You shouted: {message}")
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=True,
summary=npc.intention,
)
def _apply_strike(
npc: Npc,
directive: NpcDirective,
npcs_by_id: dict[str, Npc],
world: WorldState,
next_tick: int,
) -> ResolutionResult:
target = npcs_by_id.get(directive.target_npc_id or "")
if not target or target.id == npc.id or not is_alive(target):
npc.intention = "could not attack"
remember(npc, next_tick, "You could not attack because the target was unavailable.")
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=False,
summary=npc.intention,
)
if distance_between(npc, target) > ATTACK_RADIUS:
npc.intention = f"too far to attack {target.name}"
remember(npc, next_tick, f"You were too far away to attack {target.name}.")
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=False,
summary=npc.intention,
)
rng = random.Random(f"{world.seed}:{next_tick}:{npc.id}:attack:{target.id}")
damage = npc.attack_damage + rng.randint(0, 5)
target.health -= damage
npc.intention = f"attacking {target.name}"
npc.focus_target_id = target.id
if has_hostile_intent(npc) or directive.intent == "hostile_attack":
npc.mode = "hostile_pursuit"
target.mode = "threatened"
target.needs.safety = max(target.needs.safety, 100)
target.emotions.fear = min(100, max(target.emotions.fear, 90))
target.emotions.stress = min(100, max(target.emotions.stress, 85))
target.emotions.trust_baseline = max(0, target.emotions.trust_baseline - 20)
target.relationships[npc.id] = max(-1.0, target.relationships.get(npc.id, 0.0) - 0.5)
add_episode(
target,
next_tick,
"attack",
npc.id,
f"{npc.name} attacked me for {damage} damage",
target_id=target.id,
subject_kind="npc",
perspective="recipient",
tags=["danger"],
weight=0.9,
)
remember(npc, next_tick, f"You attacked {target.name} for {damage} damage.")
remember(target, next_tick, f"{npc.name} attacked you for {damage} damage.")
if not is_alive(target):
target.intention = "dead"
remember(target, next_tick, "You died.")
_remember_threat_observers(
actor=npc,
target=target,
npcs_by_id=npcs_by_id,
next_tick=next_tick,
text=f"{npc.name} attacked {target.name}; {npc.name} is dangerous.",
include_target=False,
kind="attack",
tags=["danger"],
)
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=True,
summary=npc.intention,
)
def _apply_idle(npc: Npc, directive: NpcDirective) -> ResolutionResult:
match directive.intent:
case "defend":
npc.intention = "defending"
case "observe":
npc.intention = "observing"
case _:
npc.intention = "idle"
return ResolutionResult(
npc_id=npc.id,
action=directive.action,
success=True,
summary=npc.intention,
)
def _validation_result(
original: NpcDirective,
resolved: NpcDirective,
*,
valid: bool,
reason: str,
) -> ValidationResult:
return ValidationResult(
valid=valid,
original_action=str(original.action),
resolved_action=resolved.action,
reason=reason,
directive=resolved,
)
def _bounded_target(world: WorldState, target: Vec3) -> Vec3:
half_width = world.terrain.width / 2
half_depth = world.terrain.depth / 2
return Vec3(
x=round(_clamp(target.x, -half_width, half_width), 3),
y=0.0,
z=round(_clamp(target.z, -half_depth, half_depth), 3),
)
def _remember_threat_observers(
*,
actor: Npc,
target: Npc,
npcs_by_id: dict[str, Npc],
next_tick: int,
text: str,
include_target: bool,
kind: str = "observe",
tags: list[str] | None = None,
) -> None:
for observer in npcs_by_id.values():
if observer.id == actor.id:
continue
if observer.id == target.id and not include_target:
continue
if not is_alive(observer):
continue
if distance_between(observer, actor) > VISIBLE_RADIUS:
continue
observer.needs.safety = max(observer.needs.safety, 85)
observer.emotions.fear = max(observer.emotions.fear, 70)
observer.emotions.stress = max(observer.emotions.stress, 60)
add_episode(
observer,
next_tick,
kind,
actor.id,
text,
target_id=target.id,
subject_kind="npc",
perspective="witness" if observer.id != target.id else "recipient",
tags=tags or ["danger"],
weight=0.7 if kind == "observe" else 0.8,
)
remember(observer, next_tick, text)
def _remember_nearby_citizens(
*,
actor: Npc,
npcs_by_id: dict[str, Npc],
next_tick: int,
text: str,
kind: str = "observe",
tags: list[str] | None = None,
) -> None:
for observer in npcs_by_id.values():
if observer.id == actor.id or not is_alive(observer):
continue
if distance_between(observer, actor) > VISIBLE_RADIUS:
continue
add_episode(
observer,
next_tick,
kind,
actor.id,
text,
subject_kind="npc",
perspective="recipient",
tags=tags or [],
weight=0.5,
)
remember(observer, next_tick, text)
def _append_resolution_trace(
traces_by_npc_id: dict[str, ActionDebugTrace],
resolved_traces: list[ActionDebugTrace],
directive: NpcDirective,
resolution: ResolutionResult,
) -> None:
trace = traces_by_npc_id.get(directive.npc_id)
if trace is None:
return
resolved_traces.append(
ActionDebugTrace(
npc_id=trace.npc_id,
original_directive=trace.original_directive,
validation=trace.validation,
resolution=resolution,
)
)
def _publish_plan_ledger_entries(
entries: list[dict[str, object]],
debug: list[dict[str, object]],
) -> None:
if not entries:
return
verdicts = _validator_verdicts_by_npc(debug)
for entry in entries:
record = dict(entry)
if record.get("phase") == "npc_response":
# Only a genuine model response (verdict still "pending") is
# reconciled with the engine's accept/repair/reject verdict. A
# response whose verdict is already "rejected" (model call failed /
# output unusable / NPC omitted) keeps that explicit reason so a
# fallback is never masked as "accepted" by the deterministic fill.
current = record.get("validator_verdict")
current_status = current.get("status") if isinstance(current, dict) else None
if current_status in (None, "pending"):
verdict = verdicts.get(str(record.get("npc_id") or ""))
if verdict is not None:
record["validator_verdict"] = verdict
append_record(record)
def _validator_verdicts_by_npc(debug: list[dict[str, object]]) -> dict[str, dict[str, object]]:
verdicts: dict[str, dict[str, object]] = {}
for trace in debug:
npc_id = trace.get("npc_id")
if not isinstance(npc_id, str):
continue
verdict = trace.get("validator_verdict")
if isinstance(verdict, dict):
verdicts[npc_id] = dict(verdict)
continue
validation = trace.get("validation")
if isinstance(validation, dict):
valid = validation.get("valid")
original = validation.get("original_action")
resolved = validation.get("resolved_action")
reason = validation.get("reason")
if valid is True:
status = "accepted"
elif original != resolved:
status = "repaired"
else:
status = "rejected"
verdicts[npc_id] = {
"status": status,
"requested": original,
"resolved": resolved,
"reason": reason,
}
return verdicts
def _trace_to_dict(trace: ActionDebugTrace) -> dict[str, object]:
return asdict(trace)
def _clamp(value: float, minimum: float, maximum: float) -> float:
return min(max(value, minimum), maximum)
def _snap_to_block_center(value: float, *, half_width: float) -> float:
first_center = -half_width + (BLOCK_SIZE / 2)
last_center = half_width - (BLOCK_SIZE / 2)
snapped_steps = round((value - first_center) / BLOCK_SIZE)
return _clamp(first_center + (snapped_steps * BLOCK_SIZE), first_center, last_center)