ZENLLC's picture
Create sim.py
9c9380f verified
import math
import random
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Tuple, Optional, Any
import numpy as np
from PIL import Image
from .config import SimConfig, DEFAULT_MODEL_RATES
from .events import EventStore
from .pricing import cost_from_tokens
from .ledger import Ledger
from .tasks import TaskSystem, Task
# -----------------------------
# Arena settings (same vibe)
# -----------------------------
GRID_W, GRID_H = 29, 19
TILE = 24
HUD_H = 64
SVG_W = GRID_W * TILE
SVG_H = GRID_H * TILE + HUD_H
VIEW_W, VIEW_H = 560, 315
FOV_DEG = 74
MAX_DEPTH = 22
DIRS = [(1, 0), (0, 1), (-1, 0), (0, -1)]
ORI_DEG = [0, 90, 180, 270]
EMPTY = 0
WALL = 1
WORK = 2 # "work nodes" (tickets/customers)
RISK = 3 # hazards/risks
INCIDENT = 4 # urgent incident hotspots
GATE = 5
COL_BG = "#0b1020"
COL_GRIDLINE = "#121a3b"
COL_WALL = "#cdd2e6"
COL_EMPTY = "#19214a"
COL_WORK = "#9ab0ff"
COL_RISK = "#ff3b3b"
COL_INC = "#ffd17a"
COL_GATE = "#7ad9ff"
AGENT_COLORS = {
"Planner": "#7ad9ff",
"Worker1": "#6dffb0",
"Worker2": "#ffd17a",
"Reviewer": "#ff7ad9",
"Ops": "#ff6d6d",
}
def clamp(v, lo, hi):
return lo if v < lo else hi if v > hi else v
def in_bounds(x: int, y: int) -> bool:
return 0 <= x < GRID_W and 0 <= y < GRID_H
def manhattan(a: Tuple[int, int], b: Tuple[int, int]) -> int:
return abs(a[0] - b[0]) + abs(a[1] - b[1])
def rng(seed: int) -> random.Random:
r = random.Random()
r.seed(seed & 0xFFFFFFFF)
return r
def base_border_grid() -> List[List[int]]:
g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)]
for x in range(GRID_W):
g[0][x] = WALL
g[GRID_H - 1][x] = WALL
for y in range(GRID_H):
g[y][0] = WALL
g[y][GRID_W - 1] = WALL
return g
def carve_office(seed: int) -> List[List[int]]:
r = rng(seed)
g = base_border_grid()
# Add some inner walls (rooms)
for _ in range(70):
x = r.randint(2, GRID_W - 3)
y = r.randint(2, GRID_H - 3)
if r.random() < 0.70:
g[y][x] = WALL
# Carve some corridors
for _ in range(120):
x = r.randint(1, GRID_W - 2)
y = r.randint(1, GRID_H - 2)
g[y][x] = EMPTY
# Gates (secure doors)
for _ in range(6):
x = r.randint(3, GRID_W - 4)
y = r.randint(3, GRID_H - 4)
if g[y][x] == WALL:
g[y][x] = GATE
# Work nodes
for _ in range(16):
x = r.randint(2, GRID_W - 3)
y = r.randint(2, GRID_H - 3)
if g[y][x] == EMPTY:
g[y][x] = WORK
# Risk nodes
for _ in range(10):
x = r.randint(2, GRID_W - 3)
y = r.randint(2, GRID_H - 3)
if g[y][x] == EMPTY:
g[y][x] = RISK
return g
def is_blocking(tile: int) -> bool:
return tile == WALL
def neighbors4(x: int, y: int) -> List[Tuple[int, int]]:
return [(x + 1, y), (x, y + 1), (x - 1, y), (x, y - 1)]
def bfs_next_step(grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[Tuple[int, int]]:
if start == goal:
return None
sx, sy = start
gx, gy = goal
q = [(sx, sy)]
prev = {start: None}
while q:
x, y = q.pop(0)
if (x, y) == (gx, gy):
break
for nx, ny in neighbors4(x, y):
if not in_bounds(nx, ny):
continue
if is_blocking(grid[ny][nx]):
continue
if (nx, ny) not in prev:
prev[(nx, ny)] = (x, y)
q.append((nx, ny))
if (gx, gy) not in prev:
return None
cur = (gx, gy)
while prev[cur] != start and prev[cur] is not None:
cur = prev[cur]
return cur
def face_towards(ori: int, ax: int, ay: int, tx: int, ty: int) -> int:
dx = tx - ax
dy = ty - ay
if abs(dx) > abs(dy):
return 0 if dx > 0 else 2
return 1 if dy > 0 else 3
# -----------------------------
# Agents + World
# -----------------------------
@dataclass
class Agent:
name: str
role: str
team: str
x: int
y: int
ori: int = 0
hp: int = 7
model_key: str = "balanced" # economy|balanced|premium
mode: str = "auto" # auto|manual (manual hooks kept)
brain: str = "biz" # biz|random
current_task_id: Optional[str] = None
focus: str = "throughput" # throughput|quality|cost
@dataclass
class World:
seed: int
step: int
grid: List[List[int]]
run_id: str
config: SimConfig
agents: Dict[str, Agent]
tasks: TaskSystem
ledger: Ledger
events: EventStore
done: bool = False
outcome: str = "running"
outage_active: bool = False
outage_timer_ticks: int = 0
overlay: bool = True
auto_camera: bool = True
pov: str = "Planner"
controlled: str = "Planner"
# UI-friendly rolling log (not authoritative)
ui_events: List[str] = field(default_factory=list)
def init_world(seed: int, run_id: str, config: SimConfig) -> World:
r = rng(seed)
grid = carve_office(seed)
# Spawn agents
agents = {
"Planner": Agent("Planner", role="Planner", team="Ops", x=2, y=2, ori=0, model_key="premium", focus="quality"),
"Worker1": Agent("Worker1", role="Worker", team="Ops", x=GRID_W - 3, y=2, ori=2, model_key="balanced", focus="throughput"),
"Worker2": Agent("Worker2", role="Worker", team="Ops", x=2, y=GRID_H - 3, ori=0, model_key="balanced", focus="throughput"),
"Reviewer": Agent("Reviewer", role="Reviewer", team="Ops", x=GRID_W - 3, y=GRID_H - 3, ori=2, model_key="premium", focus="quality"),
"Ops": Agent("Ops", role="Ops", team="Ops", x=GRID_W // 2, y=GRID_H // 2, ori=1, model_key="economy", focus="cost"),
}
tasks = TaskSystem(seed=seed)
ledger = Ledger()
store = EventStore(run_id=run_id)
w = World(
seed=seed,
step=0,
grid=grid,
run_id=run_id,
config=config,
agents=agents,
tasks=tasks,
ledger=ledger,
events=store,
pov="Planner",
controlled="Planner",
overlay=True,
auto_camera=True,
ui_events=[f"Initialized run_id={run_id} seed={seed}"],
)
# Seed initial tasks
for i in range(config.initial_tasks):
t = tasks.create_task(
t_sim=w.step,
title=f"Backlog item #{i+1}",
task_type=r.choice(["SUPPORT_TICKET", "SALES_OPS", "HR_ONBOARD", "SEC_REVIEW"]),
priority=r.choice(["P1", "P2", "P3"]),
sla_ticks=r.randint(72, 360), # a few days to a couple weeks depending on minutes_per_tick
est_effort_min=r.randint(45, 220),
value_usd=float(r.randint(50, 500)),
)
store.emit(w.step, "TASK_CREATED", payload={"task": t.task_id, "title": t.title, "type": t.task_type, "priority": t.priority}, state_obj_for_hash={"tasks": tasks.as_dict()})
store.emit(w.step, "RUN_STARTED", payload={"minutes_per_tick": config.minutes_per_tick, "budget_soft": config.budget_usd_soft, "budget_hard": config.budget_usd_hard})
return w
# -----------------------------
# Business logic: simulated LLM call + tool calls
# -----------------------------
def simulate_llm_work(w: World, agent: Agent, task: Task) -> Dict[str, Any]:
"""
This is your precision spine. Today it's simulated tokens/latency.
Tomorrow you can swap in real model calls and record actual tokens/latency.
"""
cfg = w.config
# Outage makes everything slower and more failure-prone
outage_mult = 1.8 if w.outage_active else 1.0
# Focus affects tokens & rework
focus = agent.focus
if focus == "quality":
prompt = int(cfg.est_prompt_tokens_per_task * 1.15)
completion = int(cfg.est_completion_tokens_per_task * 1.25)
rework_mult = 0.55
elif focus == "cost":
prompt = int(cfg.est_prompt_tokens_per_task * 0.75)
completion = int(cfg.est_completion_tokens_per_task * 0.70)
rework_mult = 1.25
else: # throughput
prompt = int(cfg.est_prompt_tokens_per_task * 0.95)
completion = int(cfg.est_completion_tokens_per_task * 0.95)
rework_mult = 1.00
# Model tier influences "effective latency" and (optionally) rework
if agent.model_key == "economy":
latency_ms = int(1400 * outage_mult)
rework_mult *= 1.15
elif agent.model_key == "balanced":
latency_ms = int(1900 * outage_mult)
rework_mult *= 0.95
else: # premium
latency_ms = int(2600 * outage_mult)
rework_mult *= 0.80
# Cache behavior: repeated tasks / retries more likely cached
cached = 0
if task.attempts >= 1:
cached = int(prompt * 0.60)
tokens = {
"prompt_tokens": prompt,
"completion_tokens": completion,
"cached_prompt_tokens": cached,
"reasoning_tokens": int(cfg.est_reasoning_tokens_per_task),
}
# Pricing
rate = DEFAULT_MODEL_RATES.get(agent.model_key, DEFAULT_MODEL_RATES["balanced"])
cost = cost_from_tokens(rate, tokens)
# Tool calls (light simulation)
tool_calls = 0
if task.task_type in ("SEC_REVIEW", "SUPPORT_TICKET"):
tool_calls = 2
elif task.task_type == "SALES_OPS":
tool_calls = 1
else:
tool_calls = 1
return {
"latency_ms": latency_ms,
"tokens": tokens,
"cost": cost,
"tool_calls": tool_calls,
"rework_mult": rework_mult,
}
def place_incident_tile(w: World, r: random.Random):
# Mark an incident hotspot in the arena (purely UX, but helps narrative).
for _ in range(60):
x = r.randint(2, GRID_W - 3)
y = r.randint(2, GRID_H - 3)
if w.grid[y][x] == EMPTY:
w.grid[y][x] = INCIDENT
w.ui_events.append(f"t={w.step}: INCIDENT tile spawned at ({x},{y})")
w.events.emit(w.step, "INCIDENT_SPAWNED", payload={"x": x, "y": y}, state_obj_for_hash={"grid": "incident"})
return
def sprinkle_new_work_nodes(w: World, r: random.Random, count: int = 2):
for _ in range(count):
x = r.randint(2, GRID_W - 3)
y = r.randint(2, GRID_H - 3)
if w.grid[y][x] == EMPTY:
w.grid[y][x] = WORK
def daily_injections(w: World):
"""
Once per simulated day, inject new tasks and stochastic events.
"""
cfg = w.config
r = rng(w.seed + w.step * 997)
ticks_per_day = max(1, int((24 * 60) / cfg.minutes_per_tick))
if w.step % ticks_per_day != 0:
return
# New tasks
for i in range(cfg.new_tasks_per_day):
urgent = r.random() < cfg.incident_probability_per_day
t = w.tasks.create_task(
t_sim=w.step,
title=("URGENT Incident: Service Degradation" if urgent else f"New inbound work #{w.step}-{i+1}"),
task_type=("INCIDENT" if urgent else r.choice(["SUPPORT_TICKET", "SALES_OPS", "HR_ONBOARD", "SEC_REVIEW"])),
priority=("P0" if urgent else r.choice(["P1", "P2", "P3"])),
sla_ticks=(r.randint(6, 24) if urgent else r.randint(72, 360)),
est_effort_min=(r.randint(90, 300) if urgent else r.randint(45, 220)),
value_usd=float(r.randint(200, 2500) if urgent else r.randint(50, 500)),
urgent=urgent,
)
w.events.emit(w.step, "TASK_CREATED", payload={"task": t.task_id, "title": t.title, "type": t.task_type, "priority": t.priority}, state_obj_for_hash={"task": asdict(t)})
if urgent:
place_incident_tile(w, r)
# Outage event
if (not w.outage_active) and (r.random() < cfg.outage_probability_per_day):
w.outage_active = True
w.outage_timer_ticks = r.randint(ticks_per_day // 2, ticks_per_day * 2)
w.ui_events.append(f"t={w.step}: OUTAGE started ({w.outage_timer_ticks} ticks)")
w.events.emit(w.step, "OUTAGE_STARTED", payload={"duration_ticks": w.outage_timer_ticks})
# Add some new work nodes for visuals
sprinkle_new_work_nodes(w, r, count=2)
# -----------------------------
# Agent policy: Planner -> Workers -> Reviewer
# -----------------------------
def agent_pick_target_tile(w: World, agent: Agent) -> Tuple[int, int]:
"""
Planner goes to WORK/INCIDENT; Workers go to WORK; Reviewer hovers.
"""
candidates = []
for y in range(1, GRID_H - 1):
for x in range(1, GRID_W - 1):
if agent.name == "Planner" and w.grid[y][x] in (WORK, INCIDENT):
candidates.append((x, y))
elif agent.role == "Worker" and w.grid[y][x] == WORK:
candidates.append((x, y))
elif agent.name == "Ops" and w.grid[y][x] in (RISK, INCIDENT):
candidates.append((x, y))
if not candidates:
return (agent.x, agent.y)
candidates.sort(key=lambda p: manhattan((agent.x, agent.y), p))
return candidates[0]
def move_agent_step(w: World, agent: Agent, tx: int, ty: int):
nxt = bfs_next_step(w.grid, (agent.x, agent.y), (tx, ty))
if nxt is None:
return
nx, ny = nxt
agent.ori = face_towards(agent.ori, agent.x, agent.y, nx, ny)
agent.x, agent.y = nx, ny
def maybe_start_task(w: World, agent: Agent):
if agent.current_task_id:
return
# Only Planner and Workers start tasks; Reviewer reviews after work is done
if agent.name in ("Reviewer",):
return
t = w.tasks.pick_next_task(w.step)
if not t:
return
t.status = "IN_PROGRESS"
t.owner = agent.name
t.started_t = w.step
t.attempts += 1
agent.current_task_id = t.task_id
w.ui_events.append(f"t={w.step}: {agent.name} started {t.task_id} ({t.priority}) {t.title}")
w.events.emit(w.step, "TASK_STARTED", agent_id=agent.name, role=agent.role, model_key=agent.model_key, payload={"task": t.task_id, "priority": t.priority, "type": t.task_type, "attempt": t.attempts}, state_obj_for_hash={"task": asdict(t)})
def maybe_complete_task(w: World, agent: Agent):
if not agent.current_task_id:
return
tid = agent.current_task_id
t = w.tasks.tasks.get(tid)
if not t:
agent.current_task_id = None
return
# Work happens when agent reaches a WORK/INCIDENT tile (narrative hook)
tile = w.grid[agent.y][agent.x]
if tile not in (WORK, INCIDENT, RISK):
return
# Simulate an LLM call + tools (or replace with real calls later)
llm = simulate_llm_work(w, agent, t)
w.ledger.mark_llm_call()
w.ledger.add_latency(llm["latency_ms"])
w.ledger.add_tokens(llm["tokens"])
w.ledger.add_cost(llm["cost"]["usd"], agent_id=agent.name, model_key=agent.model_key)
for _ in range(int(llm["tool_calls"])):
w.ledger.mark_tool_call()
w.events.emit(
w.step,
"LLM_CALL",
agent_id=agent.name,
role=agent.role,
model_key=agent.model_key,
payload={"task": t.task_id, "task_type": t.task_type, "attempt": t.attempts, "focus": agent.focus, "outage": w.outage_active},
latency_ms=llm["latency_ms"],
tokens=llm["tokens"],
cost=llm["cost"],
state_obj_for_hash={"ledger": {"spend": w.ledger.spend_usd, "calls": w.ledger.llm_calls}},
)
# Rework probability: base * complexity-ish * outage * focus multiplier
cfg = w.config
base = cfg.rework_probability_base
complexity = 1.0 + min(2.0, (t.est_effort_min / 180.0))
outage_mult = 1.35 if w.outage_active else 1.0
p_rework = clamp(base * complexity * outage_mult * llm["rework_mult"], 0.01, 0.65)
# Reviewer reduces rework on higher risk tasks by adding an extra pass
needs_review = (t.task_type in ("SEC_REVIEW", "INCIDENT")) or (t.priority in ("P0", "P1"))
# Budget enforcement
w.ledger.check_budget(cfg.budget_usd_soft, cfg.budget_usd_hard)
if w.ledger.spend_usd >= cfg.budget_usd_hard:
# Budget hard stop: complete with degraded quality (rework likely)
p_rework = min(0.85, p_rework * 1.65)
w.events.emit(w.step, "BUDGET_HARD_STOP", payload={"spend_usd": w.ledger.spend_usd})
# Decide status
r = rng(w.seed ^ (w.step * 1315423911) ^ (hash(agent.name) & 0xFFFFFFFF))
rework = r.random() < p_rework
if needs_review:
# Mark as awaiting review (Reviewer can "confirm" next)
t.status = "BLOCKED"
t.notes["awaiting_review"] = True
w.ui_events.append(f"t={w.step}: {t.task_id} awaiting review")
w.events.emit(w.step, "TASK_BLOCKED", agent_id=agent.name, payload={"task": t.task_id, "reason": "awaiting_review"})
else:
if rework:
t.status = "REWORK"
w.ui_events.append(f"t={w.step}: {t.task_id} needs REWORK")
w.events.emit(w.step, "TASK_REWORK", agent_id=agent.name, payload={"task": t.task_id, "p_rework": p_rework})
else:
t.status = "DONE"
t.completed_t = w.step
w.ui_events.append(f"t={w.step}: {t.task_id} DONE ✅")
w.events.emit(w.step, "TASK_COMPLETED", agent_id=agent.name, payload={"task": t.task_id, "value_usd": t.value_usd})
agent.current_task_id = None
# Clear tile to show "work node consumed"
if tile == WORK:
w.grid[agent.y][agent.x] = EMPTY
elif tile == INCIDENT:
w.grid[agent.y][agent.x] = RISK
def reviewer_pass(w: World, reviewer: Agent):
# Reviewer looks for blocked tasks awaiting review and finalizes them with a smaller LLM call
blocked = [t for t in w.tasks.tasks.values() if t.status == "BLOCKED" and t.notes.get("awaiting_review")]
if not blocked:
return
blocked.sort(key=lambda t: (t.priority != "P0", t.sla_due_t))
t = blocked[0]
# Reviewer "reviews" without moving dependence (simple)
llm = simulate_llm_work(w, reviewer, t)
# Reviewer tends to be higher quality; reduce p_rework
llm["tokens"]["prompt_tokens"] = int(llm["tokens"]["prompt_tokens"] * 0.65)
llm["tokens"]["completion_tokens"] = int(llm["tokens"]["completion_tokens"] * 0.55)
llm["latency_ms"] = int(llm["latency_ms"] * 0.9)
w.ledger.mark_llm_call()
w.ledger.add_latency(llm["latency_ms"])
w.ledger.add_tokens(llm["tokens"])
w.ledger.add_cost(llm["cost"]["usd"], agent_id=reviewer.name, model_key=reviewer.model_key)
w.events.emit(
w.step,
"REVIEW_PASS",
agent_id=reviewer.name,
role=reviewer.role,
model_key=reviewer.model_key,
payload={"task": t.task_id, "task_type": t.task_type},
latency_ms=llm["latency_ms"],
tokens=llm["tokens"],
cost=llm["cost"],
)
# Determine whether task goes to rework or done (review reduces rework heavily)
cfg = w.config
base = cfg.rework_probability_base * 0.55
complexity = 1.0 + min(2.0, (t.est_effort_min / 180.0))
outage_mult = 1.25 if w.outage_active else 1.0
p_rework = clamp(base * complexity * outage_mult, 0.01, 0.45)
r = rng(w.seed ^ (w.step * 2654435761) ^ 0xA5A5A5A5)
rework = r.random() < p_rework
if rework:
t.status = "REWORK"
t.notes["awaiting_review"] = False
w.ui_events.append(f"t={w.step}: REVIEW → {t.task_id} REWORK")
w.events.emit(w.step, "TASK_REWORK", agent_id=reviewer.name, payload={"task": t.task_id, "p_rework": p_rework})
else:
t.status = "DONE"
t.completed_t = w.step
t.notes["awaiting_review"] = False
w.ui_events.append(f"t={w.step}: REVIEW → {t.task_id} DONE ✅")
w.events.emit(w.step, "TASK_COMPLETED", agent_id=reviewer.name, payload={"task": t.task_id, "value_usd": t.value_usd})
# -----------------------------
# Tick
# -----------------------------
def tick(w: World):
if w.done:
return
# Daily injections
daily_injections(w)
# Outage timer
if w.outage_active:
w.outage_timer_ticks -= 1
if w.outage_timer_ticks <= 0:
w.outage_active = False
w.ui_events.append(f"t={w.step}: OUTAGE ended")
w.events.emit(w.step, "OUTAGE_ENDED", payload={})
# Agents move + start/complete work
for nm, a in w.agents.items():
if nm == "Reviewer":
continue
target = agent_pick_target_tile(w, a)
move_agent_step(w, a, target[0], target[1])
maybe_start_task(w, a)
maybe_complete_task(w, a)
# Reviewer pass
reviewer_pass(w, w.agents["Reviewer"])
# Camera cuts
if w.auto_camera:
# Focus the POV on whoever is currently holding a task or nearest incident
best = "Planner"
best_score = -1e9
inc_locs = [(x,y) for y in range(GRID_H) for x in range(GRID_W) if w.grid[y][x] == INCIDENT]
for nm, a in w.agents.items():
score = 0.0
if a.current_task_id:
score += 3.0
if inc_locs:
d = min(manhattan((a.x,a.y), p) for p in inc_locs)
score += max(0, 12 - d) * 0.25
score += (1.2 if nm == "Planner" else 0.0)
if score > best_score:
best_score = score
best = nm
w.pov = best
# Budget alert events
for msg in w.ledger.alerts[-2:]:
if "EXCEEDED" in msg:
w.events.emit(w.step, "BUDGET_ALERT", payload={"message": msg, "spend_usd": w.ledger.spend_usd})
# Stop conditions (user can run "years"—don’t auto-stop unless you want)
# Here we keep it running, but you can stop on hard budget if desired:
if w.ledger.spend_usd >= w.config.budget_usd_hard * 1.5:
w.done = True
w.outcome = "stopped_budget"
w.events.emit(w.step, "RUN_STOPPED", payload={"reason": "spend_guardrail", "spend_usd": w.ledger.spend_usd})
w.ui_events.append(f"t={w.step}: RUN STOPPED (spend guardrail)")
w.step += 1
# prune UI log
if len(w.ui_events) > 220:
w.ui_events = w.ui_events[-220:]
# -----------------------------
# POV renderer (lightweight raycast, adapted)
# -----------------------------
SKY = np.array([12, 14, 26], dtype=np.uint8)
FLOOR1 = np.array([24, 28, 54], dtype=np.uint8)
FLOOR2 = np.array([10, 12, 22], dtype=np.uint8)
WALL1 = np.array([205, 210, 232], dtype=np.uint8)
WALL2 = np.array([160, 168, 195], dtype=np.uint8)
GATEC = np.array([120, 220, 255], dtype=np.uint8)
WORKC = np.array([154, 176, 255], dtype=np.uint8)
INCC = np.array([255, 209, 122], dtype=np.uint8)
RISKC = np.array([255, 59, 59], dtype=np.uint8)
def within_fov(ax: int, ay: int, ori: int, tx: int, ty: int, fov_deg: float = FOV_DEG) -> bool:
dx = tx - ax
dy = ty - ay
if dx == 0 and dy == 0:
return True
ang = (math.degrees(math.atan2(dy, dx)) % 360)
facing = ORI_DEG[ori]
diff = (ang - facing + 540) % 360 - 180
return abs(diff) <= (fov_deg / 2)
def bresenham_los(grid: List[List[int]], x0: int, y0: int, x1: int, y1: int) -> bool:
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
x, y = x0, y0
while True:
if (x, y) != (x0, y0) and (x, y) != (x1, y1):
if grid[y][x] == WALL:
return False
if x == x1 and y == y1:
return True
e2 = 2 * err
if e2 > -dy:
err -= dy
x += sx
if e2 < dx:
err += dx
y += sy
def raycast_pov(w: World, who: str) -> np.ndarray:
a = w.agents[who]
img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8)
img[:, :] = SKY
for y in range(VIEW_H // 2, VIEW_H):
t = (y - VIEW_H // 2) / max(1, (VIEW_H // 2))
col = (1 - t) * FLOOR1 + t * FLOOR2
img[y, :] = col.astype(np.uint8)
ray_cols = VIEW_W
half = math.radians(FOV_DEG / 2)
base = math.radians(ORI_DEG[a.ori])
for rx in range(ray_cols):
cam = (2 * rx / (ray_cols - 1)) - 1
ang = base + cam * half
sin_a = math.sin(ang)
cos_a = math.cos(ang)
ox, oy = a.x + 0.5, a.y + 0.5
depth = 0.0
hit = None
side = 0
hit_tile = None
while depth < MAX_DEPTH:
depth += 0.06
tx = int(ox + cos_a * depth)
ty = int(oy + sin_a * depth)
if not in_bounds(tx, ty):
break
tile = w.grid[ty][tx]
if tile in (WALL, GATE, WORK, INCIDENT, RISK):
hit = "tile"
hit_tile = tile
side = 1 if abs(cos_a) > abs(sin_a) else 0
break
if hit is None:
continue
depth *= math.cos(ang - base)
depth = max(depth, 0.001)
h = int((VIEW_H * 0.92) / depth)
y0 = max(0, VIEW_H // 2 - h // 2)
y1 = min(VIEW_H - 1, VIEW_H // 2 + h // 2)
if hit_tile == GATE:
col = GATEC.copy()
elif hit_tile == WORK:
col = WORKC.copy()
elif hit_tile == INCIDENT:
col = INCC.copy()
elif hit_tile == RISK:
col = RISKC.copy()
else:
col = (WALL1.copy() if side == 0 else WALL2.copy())
dim = max(0.28, 1.0 - depth / MAX_DEPTH)
col = (col * dim).astype(np.uint8)
img[y0:y1, rx:rx + 1] = col
# agent sprites
for nm, other in w.agents.items():
if nm == who:
continue
if not within_fov(a.x, a.y, a.ori, other.x, other.y):
continue
if not bresenham_los(w.grid, a.x, a.y, other.x, other.y):
continue
dx = other.x - a.x
dy = other.y - a.y
ang = math.degrees(math.atan2(dy, dx)) % 360
facing = ORI_DEG[a.ori]
diff = (ang - facing + 540) % 360 - 180
sx = int((diff / (FOV_DEG / 2)) * (VIEW_W / 2) + (VIEW_W / 2))
dist = math.sqrt(dx * dx + dy * dy)
size = int((VIEW_H * 0.55) / max(dist, 1.0))
size = clamp(size, 10, 110)
ymid = VIEW_H // 2
x0 = clamp(sx - size // 4, 0, VIEW_W - 1)
x1 = clamp(sx + size // 4, 0, VIEW_W - 1)
y0 = clamp(ymid - size // 2, 0, VIEW_H - 1)
y1 = clamp(ymid + size // 2, 0, VIEW_H - 1)
hexcol = AGENT_COLORS.get(nm, "#ffd17a").lstrip("#")
rgb = np.array([int(hexcol[i:i+2], 16) for i in (0, 2, 4)], dtype=np.uint8)
img[y0:y1, x0:x1] = rgb
if w.overlay:
cx, cy = VIEW_W // 2, VIEW_H // 2
img[cy - 1:cy + 2, cx - 16:cx + 16] = np.array([110, 210, 255], dtype=np.uint8)
img[cy - 16:cy + 16, cx - 1:cx + 2] = np.array([110, 210, 255], dtype=np.uint8)
return img
# -----------------------------
# SVG renderer
# -----------------------------
def tile_color(tile: int) -> str:
return {
EMPTY: COL_EMPTY,
WALL: COL_WALL,
WORK: COL_WORK,
RISK: COL_RISK,
INCIDENT: COL_INC,
GATE: COL_GATE,
}.get(tile, COL_EMPTY)
def svg_render(w: World) -> str:
# HUD headline: business clock
cfg = w.config
minutes = w.step * cfg.minutes_per_tick
days = minutes / (24 * 60)
headline = f"ZEN Orchestrator Sandbox • day={days:.2f} • tick={w.step} • outage={w.outage_active}"
detail = f"spend=${w.ledger.spend_usd:.4f} • llm_calls={w.ledger.llm_calls} • tool_calls={w.ledger.tool_calls} • overdue={len(w.tasks.overdue_tasks(w.step))}"
css = f"""
<style>
.root {{
background: {COL_BG};
border-radius: 18px;
overflow: hidden;
box-shadow: 0 18px 40px rgba(0,0,0,0.45);
}}
.hud {{
font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial;
fill: rgba(235,240,255,0.92);
}}
.hudSmall {{ fill: rgba(235,240,255,0.72); }}
.tile {{ shape-rendering: crispEdges; }}
.gridline {{ stroke: {COL_GRIDLINE}; stroke-width: 1; opacity: 0.45; }}
.agent {{
transition: transform 220ms cubic-bezier(.2,.8,.2,1);
filter: drop-shadow(0px 8px 10px rgba(0,0,0,0.45));
}}
.pulse {{
animation: pulse 1.2s ease-in-out infinite;
opacity: 0.22;
}}
@keyframes pulse {{
0% {{ transform: scale(1.0); opacity: 0.14; }}
50% {{ transform: scale(1.15); opacity: 0.26; }}
100% {{ transform: scale(1.0); opacity: 0.14; }}
}}
.badge {{
fill: rgba(15,23,51,0.72);
stroke: rgba(170,195,255,0.16);
stroke-width: 1;
}}
.dead {{ opacity: 0.22; filter: none; }}
.banner {{ fill: rgba(255,255,255,0.08); }}
</style>
"""
svg = [f"""
<div class="root">
{css}
<svg width="{SVG_W}" height="{SVG_H}" viewBox="0 0 {SVG_W} {SVG_H}">
<rect x="0" y="0" width="{SVG_W}" height="{SVG_H}" fill="{COL_BG}"/>
<rect class="banner" x="0" y="0" width="{SVG_W}" height="{HUD_H}" rx="0" ry="0"/>
<text class="hud" x="18" y="28" font-size="16" font-weight="700">{headline}</text>
<text class="hud hudSmall" x="18" y="50" font-size="12">{detail}</text>
"""]
for y in range(GRID_H):
for x in range(GRID_W):
t = w.grid[y][x]
c = tile_color(t)
px = x * TILE
py = HUD_H + y * TILE
svg.append(f'<rect class="tile" x="{px}" y="{py}" width="{TILE}" height="{TILE}" fill="{c}"/>')
for x in range(GRID_W + 1):
px = x * TILE
svg.append(f'<line class="gridline" x1="{px}" y1="{HUD_H}" x2="{px}" y2="{SVG_H}"/>')
for y in range(GRID_H + 1):
py = HUD_H + y * TILE
svg.append(f'<line class="gridline" x1="0" y1="{py}" x2="{SVG_W}" y2="{py}"/>')
for nm, a in w.agents.items():
px = a.x * TILE
py = HUD_H + a.y * TILE
col = AGENT_COLORS.get(nm, "#ffd17a")
dead_cls = " dead" if a.hp <= 0 else ""
svg.append(f"""
<g class="agent{dead_cls}" style="transform: translate({px}px, {py}px);">
<circle class="pulse" cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.46}" fill="{col}"></circle>
<circle cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.34}" fill="{col}" opacity="0.98"></circle>
""")
dx, dy = DIRS[a.ori]
x2 = TILE/2 + dx*(TILE*0.32)
y2 = TILE/2 + dy*(TILE*0.32)
svg.append(f'<line x1="{TILE/2}" y1="{TILE/2}" x2="{x2}" y2="{y2}" stroke="rgba(10,10,14,0.85)" stroke-width="4" stroke-linecap="round"/>')
badge_w = max(64, 10 * len(nm) * 0.62)
svg.append(f'<rect class="badge" x="{TILE/2 - badge_w/2}" y="{TILE*0.05}" rx="10" width="{badge_w}" height="16"/>')
task = a.current_task_id or "-"
svg.append(f'<text x="{TILE/2}" y="{TILE*0.05 + 12}" text-anchor="middle" font-size="10" fill="rgba(235,240,255,0.92)" font-family="ui-sans-serif, system-ui">{nm}:{task}</text>')
if nm == w.controlled:
svg.append(f'<circle cx="{TILE*0.88}" cy="{TILE*0.18}" r="6" fill="rgba(110,180,255,0.95)"/>')
svg.append("</g>")
svg.append("</svg></div>")
return "".join(svg)