import math import random from dataclasses import dataclass, field, asdict from typing import Dict, List, Tuple, Optional, Any import numpy as np from PIL import Image from .config import SimConfig, DEFAULT_MODEL_RATES from .events import EventStore from .pricing import cost_from_tokens from .ledger import Ledger from .tasks import TaskSystem, Task # ----------------------------- # Arena settings (same vibe) # ----------------------------- GRID_W, GRID_H = 29, 19 TILE = 24 HUD_H = 64 SVG_W = GRID_W * TILE SVG_H = GRID_H * TILE + HUD_H VIEW_W, VIEW_H = 560, 315 FOV_DEG = 74 MAX_DEPTH = 22 DIRS = [(1, 0), (0, 1), (-1, 0), (0, -1)] ORI_DEG = [0, 90, 180, 270] EMPTY = 0 WALL = 1 WORK = 2 # "work nodes" (tickets/customers) RISK = 3 # hazards/risks INCIDENT = 4 # urgent incident hotspots GATE = 5 COL_BG = "#0b1020" COL_GRIDLINE = "#121a3b" COL_WALL = "#cdd2e6" COL_EMPTY = "#19214a" COL_WORK = "#9ab0ff" COL_RISK = "#ff3b3b" COL_INC = "#ffd17a" COL_GATE = "#7ad9ff" AGENT_COLORS = { "Planner": "#7ad9ff", "Worker1": "#6dffb0", "Worker2": "#ffd17a", "Reviewer": "#ff7ad9", "Ops": "#ff6d6d", } def clamp(v, lo, hi): return lo if v < lo else hi if v > hi else v def in_bounds(x: int, y: int) -> bool: return 0 <= x < GRID_W and 0 <= y < GRID_H def manhattan(a: Tuple[int, int], b: Tuple[int, int]) -> int: return abs(a[0] - b[0]) + abs(a[1] - b[1]) def rng(seed: int) -> random.Random: r = random.Random() r.seed(seed & 0xFFFFFFFF) return r def base_border_grid() -> List[List[int]]: g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)] for x in range(GRID_W): g[0][x] = WALL g[GRID_H - 1][x] = WALL for y in range(GRID_H): g[y][0] = WALL g[y][GRID_W - 1] = WALL return g def carve_office(seed: int) -> List[List[int]]: r = rng(seed) g = base_border_grid() # Add some inner walls (rooms) for _ in range(70): x = r.randint(2, GRID_W - 3) y = r.randint(2, GRID_H - 3) if r.random() < 0.70: g[y][x] = WALL # Carve some corridors for _ in range(120): x = r.randint(1, GRID_W - 2) y = r.randint(1, GRID_H - 2) g[y][x] = EMPTY # Gates (secure doors) for _ in range(6): x = r.randint(3, GRID_W - 4) y = r.randint(3, GRID_H - 4) if g[y][x] == WALL: g[y][x] = GATE # Work nodes for _ in range(16): x = r.randint(2, GRID_W - 3) y = r.randint(2, GRID_H - 3) if g[y][x] == EMPTY: g[y][x] = WORK # Risk nodes for _ in range(10): x = r.randint(2, GRID_W - 3) y = r.randint(2, GRID_H - 3) if g[y][x] == EMPTY: g[y][x] = RISK return g def is_blocking(tile: int) -> bool: return tile == WALL def neighbors4(x: int, y: int) -> List[Tuple[int, int]]: return [(x + 1, y), (x, y + 1), (x - 1, y), (x, y - 1)] def bfs_next_step(grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[Tuple[int, int]]: if start == goal: return None sx, sy = start gx, gy = goal q = [(sx, sy)] prev = {start: None} while q: x, y = q.pop(0) if (x, y) == (gx, gy): break for nx, ny in neighbors4(x, y): if not in_bounds(nx, ny): continue if is_blocking(grid[ny][nx]): continue if (nx, ny) not in prev: prev[(nx, ny)] = (x, y) q.append((nx, ny)) if (gx, gy) not in prev: return None cur = (gx, gy) while prev[cur] != start and prev[cur] is not None: cur = prev[cur] return cur def face_towards(ori: int, ax: int, ay: int, tx: int, ty: int) -> int: dx = tx - ax dy = ty - ay if abs(dx) > abs(dy): return 0 if dx > 0 else 2 return 1 if dy > 0 else 3 # ----------------------------- # Agents + World # ----------------------------- @dataclass class Agent: name: str role: str team: str x: int y: int ori: int = 0 hp: int = 7 model_key: str = "balanced" # economy|balanced|premium mode: str = "auto" # auto|manual (manual hooks kept) brain: str = "biz" # biz|random current_task_id: Optional[str] = None focus: str = "throughput" # throughput|quality|cost @dataclass class World: seed: int step: int grid: List[List[int]] run_id: str config: SimConfig agents: Dict[str, Agent] tasks: TaskSystem ledger: Ledger events: EventStore done: bool = False outcome: str = "running" outage_active: bool = False outage_timer_ticks: int = 0 overlay: bool = True auto_camera: bool = True pov: str = "Planner" controlled: str = "Planner" # UI-friendly rolling log (not authoritative) ui_events: List[str] = field(default_factory=list) def init_world(seed: int, run_id: str, config: SimConfig) -> World: r = rng(seed) grid = carve_office(seed) # Spawn agents agents = { "Planner": Agent("Planner", role="Planner", team="Ops", x=2, y=2, ori=0, model_key="premium", focus="quality"), "Worker1": Agent("Worker1", role="Worker", team="Ops", x=GRID_W - 3, y=2, ori=2, model_key="balanced", focus="throughput"), "Worker2": Agent("Worker2", role="Worker", team="Ops", x=2, y=GRID_H - 3, ori=0, model_key="balanced", focus="throughput"), "Reviewer": Agent("Reviewer", role="Reviewer", team="Ops", x=GRID_W - 3, y=GRID_H - 3, ori=2, model_key="premium", focus="quality"), "Ops": Agent("Ops", role="Ops", team="Ops", x=GRID_W // 2, y=GRID_H // 2, ori=1, model_key="economy", focus="cost"), } tasks = TaskSystem(seed=seed) ledger = Ledger() store = EventStore(run_id=run_id) w = World( seed=seed, step=0, grid=grid, run_id=run_id, config=config, agents=agents, tasks=tasks, ledger=ledger, events=store, pov="Planner", controlled="Planner", overlay=True, auto_camera=True, ui_events=[f"Initialized run_id={run_id} seed={seed}"], ) # Seed initial tasks for i in range(config.initial_tasks): t = tasks.create_task( t_sim=w.step, title=f"Backlog item #{i+1}", task_type=r.choice(["SUPPORT_TICKET", "SALES_OPS", "HR_ONBOARD", "SEC_REVIEW"]), priority=r.choice(["P1", "P2", "P3"]), sla_ticks=r.randint(72, 360), # a few days to a couple weeks depending on minutes_per_tick est_effort_min=r.randint(45, 220), value_usd=float(r.randint(50, 500)), ) store.emit(w.step, "TASK_CREATED", payload={"task": t.task_id, "title": t.title, "type": t.task_type, "priority": t.priority}, state_obj_for_hash={"tasks": tasks.as_dict()}) store.emit(w.step, "RUN_STARTED", payload={"minutes_per_tick": config.minutes_per_tick, "budget_soft": config.budget_usd_soft, "budget_hard": config.budget_usd_hard}) return w # ----------------------------- # Business logic: simulated LLM call + tool calls # ----------------------------- def simulate_llm_work(w: World, agent: Agent, task: Task) -> Dict[str, Any]: """ This is your precision spine. Today it's simulated tokens/latency. Tomorrow you can swap in real model calls and record actual tokens/latency. """ cfg = w.config # Outage makes everything slower and more failure-prone outage_mult = 1.8 if w.outage_active else 1.0 # Focus affects tokens & rework focus = agent.focus if focus == "quality": prompt = int(cfg.est_prompt_tokens_per_task * 1.15) completion = int(cfg.est_completion_tokens_per_task * 1.25) rework_mult = 0.55 elif focus == "cost": prompt = int(cfg.est_prompt_tokens_per_task * 0.75) completion = int(cfg.est_completion_tokens_per_task * 0.70) rework_mult = 1.25 else: # throughput prompt = int(cfg.est_prompt_tokens_per_task * 0.95) completion = int(cfg.est_completion_tokens_per_task * 0.95) rework_mult = 1.00 # Model tier influences "effective latency" and (optionally) rework if agent.model_key == "economy": latency_ms = int(1400 * outage_mult) rework_mult *= 1.15 elif agent.model_key == "balanced": latency_ms = int(1900 * outage_mult) rework_mult *= 0.95 else: # premium latency_ms = int(2600 * outage_mult) rework_mult *= 0.80 # Cache behavior: repeated tasks / retries more likely cached cached = 0 if task.attempts >= 1: cached = int(prompt * 0.60) tokens = { "prompt_tokens": prompt, "completion_tokens": completion, "cached_prompt_tokens": cached, "reasoning_tokens": int(cfg.est_reasoning_tokens_per_task), } # Pricing rate = DEFAULT_MODEL_RATES.get(agent.model_key, DEFAULT_MODEL_RATES["balanced"]) cost = cost_from_tokens(rate, tokens) # Tool calls (light simulation) tool_calls = 0 if task.task_type in ("SEC_REVIEW", "SUPPORT_TICKET"): tool_calls = 2 elif task.task_type == "SALES_OPS": tool_calls = 1 else: tool_calls = 1 return { "latency_ms": latency_ms, "tokens": tokens, "cost": cost, "tool_calls": tool_calls, "rework_mult": rework_mult, } def place_incident_tile(w: World, r: random.Random): # Mark an incident hotspot in the arena (purely UX, but helps narrative). for _ in range(60): x = r.randint(2, GRID_W - 3) y = r.randint(2, GRID_H - 3) if w.grid[y][x] == EMPTY: w.grid[y][x] = INCIDENT w.ui_events.append(f"t={w.step}: INCIDENT tile spawned at ({x},{y})") w.events.emit(w.step, "INCIDENT_SPAWNED", payload={"x": x, "y": y}, state_obj_for_hash={"grid": "incident"}) return def sprinkle_new_work_nodes(w: World, r: random.Random, count: int = 2): for _ in range(count): x = r.randint(2, GRID_W - 3) y = r.randint(2, GRID_H - 3) if w.grid[y][x] == EMPTY: w.grid[y][x] = WORK def daily_injections(w: World): """ Once per simulated day, inject new tasks and stochastic events. """ cfg = w.config r = rng(w.seed + w.step * 997) ticks_per_day = max(1, int((24 * 60) / cfg.minutes_per_tick)) if w.step % ticks_per_day != 0: return # New tasks for i in range(cfg.new_tasks_per_day): urgent = r.random() < cfg.incident_probability_per_day t = w.tasks.create_task( t_sim=w.step, title=("URGENT Incident: Service Degradation" if urgent else f"New inbound work #{w.step}-{i+1}"), task_type=("INCIDENT" if urgent else r.choice(["SUPPORT_TICKET", "SALES_OPS", "HR_ONBOARD", "SEC_REVIEW"])), priority=("P0" if urgent else r.choice(["P1", "P2", "P3"])), sla_ticks=(r.randint(6, 24) if urgent else r.randint(72, 360)), est_effort_min=(r.randint(90, 300) if urgent else r.randint(45, 220)), value_usd=float(r.randint(200, 2500) if urgent else r.randint(50, 500)), urgent=urgent, ) w.events.emit(w.step, "TASK_CREATED", payload={"task": t.task_id, "title": t.title, "type": t.task_type, "priority": t.priority}, state_obj_for_hash={"task": asdict(t)}) if urgent: place_incident_tile(w, r) # Outage event if (not w.outage_active) and (r.random() < cfg.outage_probability_per_day): w.outage_active = True w.outage_timer_ticks = r.randint(ticks_per_day // 2, ticks_per_day * 2) w.ui_events.append(f"t={w.step}: OUTAGE started ({w.outage_timer_ticks} ticks)") w.events.emit(w.step, "OUTAGE_STARTED", payload={"duration_ticks": w.outage_timer_ticks}) # Add some new work nodes for visuals sprinkle_new_work_nodes(w, r, count=2) # ----------------------------- # Agent policy: Planner -> Workers -> Reviewer # ----------------------------- def agent_pick_target_tile(w: World, agent: Agent) -> Tuple[int, int]: """ Planner goes to WORK/INCIDENT; Workers go to WORK; Reviewer hovers. """ candidates = [] for y in range(1, GRID_H - 1): for x in range(1, GRID_W - 1): if agent.name == "Planner" and w.grid[y][x] in (WORK, INCIDENT): candidates.append((x, y)) elif agent.role == "Worker" and w.grid[y][x] == WORK: candidates.append((x, y)) elif agent.name == "Ops" and w.grid[y][x] in (RISK, INCIDENT): candidates.append((x, y)) if not candidates: return (agent.x, agent.y) candidates.sort(key=lambda p: manhattan((agent.x, agent.y), p)) return candidates[0] def move_agent_step(w: World, agent: Agent, tx: int, ty: int): nxt = bfs_next_step(w.grid, (agent.x, agent.y), (tx, ty)) if nxt is None: return nx, ny = nxt agent.ori = face_towards(agent.ori, agent.x, agent.y, nx, ny) agent.x, agent.y = nx, ny def maybe_start_task(w: World, agent: Agent): if agent.current_task_id: return # Only Planner and Workers start tasks; Reviewer reviews after work is done if agent.name in ("Reviewer",): return t = w.tasks.pick_next_task(w.step) if not t: return t.status = "IN_PROGRESS" t.owner = agent.name t.started_t = w.step t.attempts += 1 agent.current_task_id = t.task_id w.ui_events.append(f"t={w.step}: {agent.name} started {t.task_id} ({t.priority}) {t.title}") w.events.emit(w.step, "TASK_STARTED", agent_id=agent.name, role=agent.role, model_key=agent.model_key, payload={"task": t.task_id, "priority": t.priority, "type": t.task_type, "attempt": t.attempts}, state_obj_for_hash={"task": asdict(t)}) def maybe_complete_task(w: World, agent: Agent): if not agent.current_task_id: return tid = agent.current_task_id t = w.tasks.tasks.get(tid) if not t: agent.current_task_id = None return # Work happens when agent reaches a WORK/INCIDENT tile (narrative hook) tile = w.grid[agent.y][agent.x] if tile not in (WORK, INCIDENT, RISK): return # Simulate an LLM call + tools (or replace with real calls later) llm = simulate_llm_work(w, agent, t) w.ledger.mark_llm_call() w.ledger.add_latency(llm["latency_ms"]) w.ledger.add_tokens(llm["tokens"]) w.ledger.add_cost(llm["cost"]["usd"], agent_id=agent.name, model_key=agent.model_key) for _ in range(int(llm["tool_calls"])): w.ledger.mark_tool_call() w.events.emit( w.step, "LLM_CALL", agent_id=agent.name, role=agent.role, model_key=agent.model_key, payload={"task": t.task_id, "task_type": t.task_type, "attempt": t.attempts, "focus": agent.focus, "outage": w.outage_active}, latency_ms=llm["latency_ms"], tokens=llm["tokens"], cost=llm["cost"], state_obj_for_hash={"ledger": {"spend": w.ledger.spend_usd, "calls": w.ledger.llm_calls}}, ) # Rework probability: base * complexity-ish * outage * focus multiplier cfg = w.config base = cfg.rework_probability_base complexity = 1.0 + min(2.0, (t.est_effort_min / 180.0)) outage_mult = 1.35 if w.outage_active else 1.0 p_rework = clamp(base * complexity * outage_mult * llm["rework_mult"], 0.01, 0.65) # Reviewer reduces rework on higher risk tasks by adding an extra pass needs_review = (t.task_type in ("SEC_REVIEW", "INCIDENT")) or (t.priority in ("P0", "P1")) # Budget enforcement w.ledger.check_budget(cfg.budget_usd_soft, cfg.budget_usd_hard) if w.ledger.spend_usd >= cfg.budget_usd_hard: # Budget hard stop: complete with degraded quality (rework likely) p_rework = min(0.85, p_rework * 1.65) w.events.emit(w.step, "BUDGET_HARD_STOP", payload={"spend_usd": w.ledger.spend_usd}) # Decide status r = rng(w.seed ^ (w.step * 1315423911) ^ (hash(agent.name) & 0xFFFFFFFF)) rework = r.random() < p_rework if needs_review: # Mark as awaiting review (Reviewer can "confirm" next) t.status = "BLOCKED" t.notes["awaiting_review"] = True w.ui_events.append(f"t={w.step}: {t.task_id} awaiting review") w.events.emit(w.step, "TASK_BLOCKED", agent_id=agent.name, payload={"task": t.task_id, "reason": "awaiting_review"}) else: if rework: t.status = "REWORK" w.ui_events.append(f"t={w.step}: {t.task_id} needs REWORK") w.events.emit(w.step, "TASK_REWORK", agent_id=agent.name, payload={"task": t.task_id, "p_rework": p_rework}) else: t.status = "DONE" t.completed_t = w.step w.ui_events.append(f"t={w.step}: {t.task_id} DONE ✅") w.events.emit(w.step, "TASK_COMPLETED", agent_id=agent.name, payload={"task": t.task_id, "value_usd": t.value_usd}) agent.current_task_id = None # Clear tile to show "work node consumed" if tile == WORK: w.grid[agent.y][agent.x] = EMPTY elif tile == INCIDENT: w.grid[agent.y][agent.x] = RISK def reviewer_pass(w: World, reviewer: Agent): # Reviewer looks for blocked tasks awaiting review and finalizes them with a smaller LLM call blocked = [t for t in w.tasks.tasks.values() if t.status == "BLOCKED" and t.notes.get("awaiting_review")] if not blocked: return blocked.sort(key=lambda t: (t.priority != "P0", t.sla_due_t)) t = blocked[0] # Reviewer "reviews" without moving dependence (simple) llm = simulate_llm_work(w, reviewer, t) # Reviewer tends to be higher quality; reduce p_rework llm["tokens"]["prompt_tokens"] = int(llm["tokens"]["prompt_tokens"] * 0.65) llm["tokens"]["completion_tokens"] = int(llm["tokens"]["completion_tokens"] * 0.55) llm["latency_ms"] = int(llm["latency_ms"] * 0.9) w.ledger.mark_llm_call() w.ledger.add_latency(llm["latency_ms"]) w.ledger.add_tokens(llm["tokens"]) w.ledger.add_cost(llm["cost"]["usd"], agent_id=reviewer.name, model_key=reviewer.model_key) w.events.emit( w.step, "REVIEW_PASS", agent_id=reviewer.name, role=reviewer.role, model_key=reviewer.model_key, payload={"task": t.task_id, "task_type": t.task_type}, latency_ms=llm["latency_ms"], tokens=llm["tokens"], cost=llm["cost"], ) # Determine whether task goes to rework or done (review reduces rework heavily) cfg = w.config base = cfg.rework_probability_base * 0.55 complexity = 1.0 + min(2.0, (t.est_effort_min / 180.0)) outage_mult = 1.25 if w.outage_active else 1.0 p_rework = clamp(base * complexity * outage_mult, 0.01, 0.45) r = rng(w.seed ^ (w.step * 2654435761) ^ 0xA5A5A5A5) rework = r.random() < p_rework if rework: t.status = "REWORK" t.notes["awaiting_review"] = False w.ui_events.append(f"t={w.step}: REVIEW → {t.task_id} REWORK") w.events.emit(w.step, "TASK_REWORK", agent_id=reviewer.name, payload={"task": t.task_id, "p_rework": p_rework}) else: t.status = "DONE" t.completed_t = w.step t.notes["awaiting_review"] = False w.ui_events.append(f"t={w.step}: REVIEW → {t.task_id} DONE ✅") w.events.emit(w.step, "TASK_COMPLETED", agent_id=reviewer.name, payload={"task": t.task_id, "value_usd": t.value_usd}) # ----------------------------- # Tick # ----------------------------- def tick(w: World): if w.done: return # Daily injections daily_injections(w) # Outage timer if w.outage_active: w.outage_timer_ticks -= 1 if w.outage_timer_ticks <= 0: w.outage_active = False w.ui_events.append(f"t={w.step}: OUTAGE ended") w.events.emit(w.step, "OUTAGE_ENDED", payload={}) # Agents move + start/complete work for nm, a in w.agents.items(): if nm == "Reviewer": continue target = agent_pick_target_tile(w, a) move_agent_step(w, a, target[0], target[1]) maybe_start_task(w, a) maybe_complete_task(w, a) # Reviewer pass reviewer_pass(w, w.agents["Reviewer"]) # Camera cuts if w.auto_camera: # Focus the POV on whoever is currently holding a task or nearest incident best = "Planner" best_score = -1e9 inc_locs = [(x,y) for y in range(GRID_H) for x in range(GRID_W) if w.grid[y][x] == INCIDENT] for nm, a in w.agents.items(): score = 0.0 if a.current_task_id: score += 3.0 if inc_locs: d = min(manhattan((a.x,a.y), p) for p in inc_locs) score += max(0, 12 - d) * 0.25 score += (1.2 if nm == "Planner" else 0.0) if score > best_score: best_score = score best = nm w.pov = best # Budget alert events for msg in w.ledger.alerts[-2:]: if "EXCEEDED" in msg: w.events.emit(w.step, "BUDGET_ALERT", payload={"message": msg, "spend_usd": w.ledger.spend_usd}) # Stop conditions (user can run "years"—don’t auto-stop unless you want) # Here we keep it running, but you can stop on hard budget if desired: if w.ledger.spend_usd >= w.config.budget_usd_hard * 1.5: w.done = True w.outcome = "stopped_budget" w.events.emit(w.step, "RUN_STOPPED", payload={"reason": "spend_guardrail", "spend_usd": w.ledger.spend_usd}) w.ui_events.append(f"t={w.step}: RUN STOPPED (spend guardrail)") w.step += 1 # prune UI log if len(w.ui_events) > 220: w.ui_events = w.ui_events[-220:] # ----------------------------- # POV renderer (lightweight raycast, adapted) # ----------------------------- SKY = np.array([12, 14, 26], dtype=np.uint8) FLOOR1 = np.array([24, 28, 54], dtype=np.uint8) FLOOR2 = np.array([10, 12, 22], dtype=np.uint8) WALL1 = np.array([205, 210, 232], dtype=np.uint8) WALL2 = np.array([160, 168, 195], dtype=np.uint8) GATEC = np.array([120, 220, 255], dtype=np.uint8) WORKC = np.array([154, 176, 255], dtype=np.uint8) INCC = np.array([255, 209, 122], dtype=np.uint8) RISKC = np.array([255, 59, 59], dtype=np.uint8) def within_fov(ax: int, ay: int, ori: int, tx: int, ty: int, fov_deg: float = FOV_DEG) -> bool: dx = tx - ax dy = ty - ay if dx == 0 and dy == 0: return True ang = (math.degrees(math.atan2(dy, dx)) % 360) facing = ORI_DEG[ori] diff = (ang - facing + 540) % 360 - 180 return abs(diff) <= (fov_deg / 2) def bresenham_los(grid: List[List[int]], x0: int, y0: int, x1: int, y1: int) -> bool: dx = abs(x1 - x0) dy = abs(y1 - y0) sx = 1 if x0 < x1 else -1 sy = 1 if y0 < y1 else -1 err = dx - dy x, y = x0, y0 while True: if (x, y) != (x0, y0) and (x, y) != (x1, y1): if grid[y][x] == WALL: return False if x == x1 and y == y1: return True e2 = 2 * err if e2 > -dy: err -= dy x += sx if e2 < dx: err += dx y += sy def raycast_pov(w: World, who: str) -> np.ndarray: a = w.agents[who] img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8) img[:, :] = SKY for y in range(VIEW_H // 2, VIEW_H): t = (y - VIEW_H // 2) / max(1, (VIEW_H // 2)) col = (1 - t) * FLOOR1 + t * FLOOR2 img[y, :] = col.astype(np.uint8) ray_cols = VIEW_W half = math.radians(FOV_DEG / 2) base = math.radians(ORI_DEG[a.ori]) for rx in range(ray_cols): cam = (2 * rx / (ray_cols - 1)) - 1 ang = base + cam * half sin_a = math.sin(ang) cos_a = math.cos(ang) ox, oy = a.x + 0.5, a.y + 0.5 depth = 0.0 hit = None side = 0 hit_tile = None while depth < MAX_DEPTH: depth += 0.06 tx = int(ox + cos_a * depth) ty = int(oy + sin_a * depth) if not in_bounds(tx, ty): break tile = w.grid[ty][tx] if tile in (WALL, GATE, WORK, INCIDENT, RISK): hit = "tile" hit_tile = tile side = 1 if abs(cos_a) > abs(sin_a) else 0 break if hit is None: continue depth *= math.cos(ang - base) depth = max(depth, 0.001) h = int((VIEW_H * 0.92) / depth) y0 = max(0, VIEW_H // 2 - h // 2) y1 = min(VIEW_H - 1, VIEW_H // 2 + h // 2) if hit_tile == GATE: col = GATEC.copy() elif hit_tile == WORK: col = WORKC.copy() elif hit_tile == INCIDENT: col = INCC.copy() elif hit_tile == RISK: col = RISKC.copy() else: col = (WALL1.copy() if side == 0 else WALL2.copy()) dim = max(0.28, 1.0 - depth / MAX_DEPTH) col = (col * dim).astype(np.uint8) img[y0:y1, rx:rx + 1] = col # agent sprites for nm, other in w.agents.items(): if nm == who: continue if not within_fov(a.x, a.y, a.ori, other.x, other.y): continue if not bresenham_los(w.grid, a.x, a.y, other.x, other.y): continue dx = other.x - a.x dy = other.y - a.y ang = math.degrees(math.atan2(dy, dx)) % 360 facing = ORI_DEG[a.ori] diff = (ang - facing + 540) % 360 - 180 sx = int((diff / (FOV_DEG / 2)) * (VIEW_W / 2) + (VIEW_W / 2)) dist = math.sqrt(dx * dx + dy * dy) size = int((VIEW_H * 0.55) / max(dist, 1.0)) size = clamp(size, 10, 110) ymid = VIEW_H // 2 x0 = clamp(sx - size // 4, 0, VIEW_W - 1) x1 = clamp(sx + size // 4, 0, VIEW_W - 1) y0 = clamp(ymid - size // 2, 0, VIEW_H - 1) y1 = clamp(ymid + size // 2, 0, VIEW_H - 1) hexcol = AGENT_COLORS.get(nm, "#ffd17a").lstrip("#") rgb = np.array([int(hexcol[i:i+2], 16) for i in (0, 2, 4)], dtype=np.uint8) img[y0:y1, x0:x1] = rgb if w.overlay: cx, cy = VIEW_W // 2, VIEW_H // 2 img[cy - 1:cy + 2, cx - 16:cx + 16] = np.array([110, 210, 255], dtype=np.uint8) img[cy - 16:cy + 16, cx - 1:cx + 2] = np.array([110, 210, 255], dtype=np.uint8) return img # ----------------------------- # SVG renderer # ----------------------------- def tile_color(tile: int) -> str: return { EMPTY: COL_EMPTY, WALL: COL_WALL, WORK: COL_WORK, RISK: COL_RISK, INCIDENT: COL_INC, GATE: COL_GATE, }.get(tile, COL_EMPTY) def svg_render(w: World) -> str: # HUD headline: business clock cfg = w.config minutes = w.step * cfg.minutes_per_tick days = minutes / (24 * 60) headline = f"ZEN Orchestrator Sandbox • day={days:.2f} • tick={w.step} • outage={w.outage_active}" detail = f"spend=${w.ledger.spend_usd:.4f} • llm_calls={w.ledger.llm_calls} • tool_calls={w.ledger.tool_calls} • overdue={len(w.tasks.overdue_tasks(w.step))}" css = f""" """ svg = [f"""