Spaces:
Sleeping
Sleeping
| import math | |
| import random | |
| from dataclasses import dataclass, field, asdict | |
| from typing import Dict, List, Tuple, Optional, Any | |
| import numpy as np | |
| from PIL import Image | |
| from .config import SimConfig, DEFAULT_MODEL_RATES | |
| from .events import EventStore | |
| from .pricing import cost_from_tokens | |
| from .ledger import Ledger | |
| from .tasks import TaskSystem, Task | |
| # ----------------------------- | |
| # Arena settings (same vibe) | |
| # ----------------------------- | |
| GRID_W, GRID_H = 29, 19 | |
| TILE = 24 | |
| HUD_H = 64 | |
| SVG_W = GRID_W * TILE | |
| SVG_H = GRID_H * TILE + HUD_H | |
| VIEW_W, VIEW_H = 560, 315 | |
| FOV_DEG = 74 | |
| MAX_DEPTH = 22 | |
| DIRS = [(1, 0), (0, 1), (-1, 0), (0, -1)] | |
| ORI_DEG = [0, 90, 180, 270] | |
| EMPTY = 0 | |
| WALL = 1 | |
| WORK = 2 # "work nodes" (tickets/customers) | |
| RISK = 3 # hazards/risks | |
| INCIDENT = 4 # urgent incident hotspots | |
| GATE = 5 | |
| COL_BG = "#0b1020" | |
| COL_GRIDLINE = "#121a3b" | |
| COL_WALL = "#cdd2e6" | |
| COL_EMPTY = "#19214a" | |
| COL_WORK = "#9ab0ff" | |
| COL_RISK = "#ff3b3b" | |
| COL_INC = "#ffd17a" | |
| COL_GATE = "#7ad9ff" | |
| AGENT_COLORS = { | |
| "Planner": "#7ad9ff", | |
| "Worker1": "#6dffb0", | |
| "Worker2": "#ffd17a", | |
| "Reviewer": "#ff7ad9", | |
| "Ops": "#ff6d6d", | |
| } | |
| def clamp(v, lo, hi): | |
| return lo if v < lo else hi if v > hi else v | |
| def in_bounds(x: int, y: int) -> bool: | |
| return 0 <= x < GRID_W and 0 <= y < GRID_H | |
| def manhattan(a: Tuple[int, int], b: Tuple[int, int]) -> int: | |
| return abs(a[0] - b[0]) + abs(a[1] - b[1]) | |
| def rng(seed: int) -> random.Random: | |
| r = random.Random() | |
| r.seed(seed & 0xFFFFFFFF) | |
| return r | |
| def base_border_grid() -> List[List[int]]: | |
| g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)] | |
| for x in range(GRID_W): | |
| g[0][x] = WALL | |
| g[GRID_H - 1][x] = WALL | |
| for y in range(GRID_H): | |
| g[y][0] = WALL | |
| g[y][GRID_W - 1] = WALL | |
| return g | |
| def carve_office(seed: int) -> List[List[int]]: | |
| r = rng(seed) | |
| g = base_border_grid() | |
| # Add some inner walls (rooms) | |
| for _ in range(70): | |
| x = r.randint(2, GRID_W - 3) | |
| y = r.randint(2, GRID_H - 3) | |
| if r.random() < 0.70: | |
| g[y][x] = WALL | |
| # Carve some corridors | |
| for _ in range(120): | |
| x = r.randint(1, GRID_W - 2) | |
| y = r.randint(1, GRID_H - 2) | |
| g[y][x] = EMPTY | |
| # Gates (secure doors) | |
| for _ in range(6): | |
| x = r.randint(3, GRID_W - 4) | |
| y = r.randint(3, GRID_H - 4) | |
| if g[y][x] == WALL: | |
| g[y][x] = GATE | |
| # Work nodes | |
| for _ in range(16): | |
| x = r.randint(2, GRID_W - 3) | |
| y = r.randint(2, GRID_H - 3) | |
| if g[y][x] == EMPTY: | |
| g[y][x] = WORK | |
| # Risk nodes | |
| for _ in range(10): | |
| x = r.randint(2, GRID_W - 3) | |
| y = r.randint(2, GRID_H - 3) | |
| if g[y][x] == EMPTY: | |
| g[y][x] = RISK | |
| return g | |
| def is_blocking(tile: int) -> bool: | |
| return tile == WALL | |
| def neighbors4(x: int, y: int) -> List[Tuple[int, int]]: | |
| return [(x + 1, y), (x, y + 1), (x - 1, y), (x, y - 1)] | |
| def bfs_next_step(grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[Tuple[int, int]]: | |
| if start == goal: | |
| return None | |
| sx, sy = start | |
| gx, gy = goal | |
| q = [(sx, sy)] | |
| prev = {start: None} | |
| while q: | |
| x, y = q.pop(0) | |
| if (x, y) == (gx, gy): | |
| break | |
| for nx, ny in neighbors4(x, y): | |
| if not in_bounds(nx, ny): | |
| continue | |
| if is_blocking(grid[ny][nx]): | |
| continue | |
| if (nx, ny) not in prev: | |
| prev[(nx, ny)] = (x, y) | |
| q.append((nx, ny)) | |
| if (gx, gy) not in prev: | |
| return None | |
| cur = (gx, gy) | |
| while prev[cur] != start and prev[cur] is not None: | |
| cur = prev[cur] | |
| return cur | |
| def face_towards(ori: int, ax: int, ay: int, tx: int, ty: int) -> int: | |
| dx = tx - ax | |
| dy = ty - ay | |
| if abs(dx) > abs(dy): | |
| return 0 if dx > 0 else 2 | |
| return 1 if dy > 0 else 3 | |
| # ----------------------------- | |
| # Agents + World | |
| # ----------------------------- | |
| class Agent: | |
| name: str | |
| role: str | |
| team: str | |
| x: int | |
| y: int | |
| ori: int = 0 | |
| hp: int = 7 | |
| model_key: str = "balanced" # economy|balanced|premium | |
| mode: str = "auto" # auto|manual (manual hooks kept) | |
| brain: str = "biz" # biz|random | |
| current_task_id: Optional[str] = None | |
| focus: str = "throughput" # throughput|quality|cost | |
| class World: | |
| seed: int | |
| step: int | |
| grid: List[List[int]] | |
| run_id: str | |
| config: SimConfig | |
| agents: Dict[str, Agent] | |
| tasks: TaskSystem | |
| ledger: Ledger | |
| events: EventStore | |
| done: bool = False | |
| outcome: str = "running" | |
| outage_active: bool = False | |
| outage_timer_ticks: int = 0 | |
| overlay: bool = True | |
| auto_camera: bool = True | |
| pov: str = "Planner" | |
| controlled: str = "Planner" | |
| # UI-friendly rolling log (not authoritative) | |
| ui_events: List[str] = field(default_factory=list) | |
| def init_world(seed: int, run_id: str, config: SimConfig) -> World: | |
| r = rng(seed) | |
| grid = carve_office(seed) | |
| # Spawn agents | |
| agents = { | |
| "Planner": Agent("Planner", role="Planner", team="Ops", x=2, y=2, ori=0, model_key="premium", focus="quality"), | |
| "Worker1": Agent("Worker1", role="Worker", team="Ops", x=GRID_W - 3, y=2, ori=2, model_key="balanced", focus="throughput"), | |
| "Worker2": Agent("Worker2", role="Worker", team="Ops", x=2, y=GRID_H - 3, ori=0, model_key="balanced", focus="throughput"), | |
| "Reviewer": Agent("Reviewer", role="Reviewer", team="Ops", x=GRID_W - 3, y=GRID_H - 3, ori=2, model_key="premium", focus="quality"), | |
| "Ops": Agent("Ops", role="Ops", team="Ops", x=GRID_W // 2, y=GRID_H // 2, ori=1, model_key="economy", focus="cost"), | |
| } | |
| tasks = TaskSystem(seed=seed) | |
| ledger = Ledger() | |
| store = EventStore(run_id=run_id) | |
| w = World( | |
| seed=seed, | |
| step=0, | |
| grid=grid, | |
| run_id=run_id, | |
| config=config, | |
| agents=agents, | |
| tasks=tasks, | |
| ledger=ledger, | |
| events=store, | |
| pov="Planner", | |
| controlled="Planner", | |
| overlay=True, | |
| auto_camera=True, | |
| ui_events=[f"Initialized run_id={run_id} seed={seed}"], | |
| ) | |
| # Seed initial tasks | |
| for i in range(config.initial_tasks): | |
| t = tasks.create_task( | |
| t_sim=w.step, | |
| title=f"Backlog item #{i+1}", | |
| task_type=r.choice(["SUPPORT_TICKET", "SALES_OPS", "HR_ONBOARD", "SEC_REVIEW"]), | |
| priority=r.choice(["P1", "P2", "P3"]), | |
| sla_ticks=r.randint(72, 360), # a few days to a couple weeks depending on minutes_per_tick | |
| est_effort_min=r.randint(45, 220), | |
| value_usd=float(r.randint(50, 500)), | |
| ) | |
| store.emit(w.step, "TASK_CREATED", payload={"task": t.task_id, "title": t.title, "type": t.task_type, "priority": t.priority}, state_obj_for_hash={"tasks": tasks.as_dict()}) | |
| store.emit(w.step, "RUN_STARTED", payload={"minutes_per_tick": config.minutes_per_tick, "budget_soft": config.budget_usd_soft, "budget_hard": config.budget_usd_hard}) | |
| return w | |
| # ----------------------------- | |
| # Business logic: simulated LLM call + tool calls | |
| # ----------------------------- | |
| def simulate_llm_work(w: World, agent: Agent, task: Task) -> Dict[str, Any]: | |
| """ | |
| This is your precision spine. Today it's simulated tokens/latency. | |
| Tomorrow you can swap in real model calls and record actual tokens/latency. | |
| """ | |
| cfg = w.config | |
| # Outage makes everything slower and more failure-prone | |
| outage_mult = 1.8 if w.outage_active else 1.0 | |
| # Focus affects tokens & rework | |
| focus = agent.focus | |
| if focus == "quality": | |
| prompt = int(cfg.est_prompt_tokens_per_task * 1.15) | |
| completion = int(cfg.est_completion_tokens_per_task * 1.25) | |
| rework_mult = 0.55 | |
| elif focus == "cost": | |
| prompt = int(cfg.est_prompt_tokens_per_task * 0.75) | |
| completion = int(cfg.est_completion_tokens_per_task * 0.70) | |
| rework_mult = 1.25 | |
| else: # throughput | |
| prompt = int(cfg.est_prompt_tokens_per_task * 0.95) | |
| completion = int(cfg.est_completion_tokens_per_task * 0.95) | |
| rework_mult = 1.00 | |
| # Model tier influences "effective latency" and (optionally) rework | |
| if agent.model_key == "economy": | |
| latency_ms = int(1400 * outage_mult) | |
| rework_mult *= 1.15 | |
| elif agent.model_key == "balanced": | |
| latency_ms = int(1900 * outage_mult) | |
| rework_mult *= 0.95 | |
| else: # premium | |
| latency_ms = int(2600 * outage_mult) | |
| rework_mult *= 0.80 | |
| # Cache behavior: repeated tasks / retries more likely cached | |
| cached = 0 | |
| if task.attempts >= 1: | |
| cached = int(prompt * 0.60) | |
| tokens = { | |
| "prompt_tokens": prompt, | |
| "completion_tokens": completion, | |
| "cached_prompt_tokens": cached, | |
| "reasoning_tokens": int(cfg.est_reasoning_tokens_per_task), | |
| } | |
| # Pricing | |
| rate = DEFAULT_MODEL_RATES.get(agent.model_key, DEFAULT_MODEL_RATES["balanced"]) | |
| cost = cost_from_tokens(rate, tokens) | |
| # Tool calls (light simulation) | |
| tool_calls = 0 | |
| if task.task_type in ("SEC_REVIEW", "SUPPORT_TICKET"): | |
| tool_calls = 2 | |
| elif task.task_type == "SALES_OPS": | |
| tool_calls = 1 | |
| else: | |
| tool_calls = 1 | |
| return { | |
| "latency_ms": latency_ms, | |
| "tokens": tokens, | |
| "cost": cost, | |
| "tool_calls": tool_calls, | |
| "rework_mult": rework_mult, | |
| } | |
| def place_incident_tile(w: World, r: random.Random): | |
| # Mark an incident hotspot in the arena (purely UX, but helps narrative). | |
| for _ in range(60): | |
| x = r.randint(2, GRID_W - 3) | |
| y = r.randint(2, GRID_H - 3) | |
| if w.grid[y][x] == EMPTY: | |
| w.grid[y][x] = INCIDENT | |
| w.ui_events.append(f"t={w.step}: INCIDENT tile spawned at ({x},{y})") | |
| w.events.emit(w.step, "INCIDENT_SPAWNED", payload={"x": x, "y": y}, state_obj_for_hash={"grid": "incident"}) | |
| return | |
| def sprinkle_new_work_nodes(w: World, r: random.Random, count: int = 2): | |
| for _ in range(count): | |
| x = r.randint(2, GRID_W - 3) | |
| y = r.randint(2, GRID_H - 3) | |
| if w.grid[y][x] == EMPTY: | |
| w.grid[y][x] = WORK | |
| def daily_injections(w: World): | |
| """ | |
| Once per simulated day, inject new tasks and stochastic events. | |
| """ | |
| cfg = w.config | |
| r = rng(w.seed + w.step * 997) | |
| ticks_per_day = max(1, int((24 * 60) / cfg.minutes_per_tick)) | |
| if w.step % ticks_per_day != 0: | |
| return | |
| # New tasks | |
| for i in range(cfg.new_tasks_per_day): | |
| urgent = r.random() < cfg.incident_probability_per_day | |
| t = w.tasks.create_task( | |
| t_sim=w.step, | |
| title=("URGENT Incident: Service Degradation" if urgent else f"New inbound work #{w.step}-{i+1}"), | |
| task_type=("INCIDENT" if urgent else r.choice(["SUPPORT_TICKET", "SALES_OPS", "HR_ONBOARD", "SEC_REVIEW"])), | |
| priority=("P0" if urgent else r.choice(["P1", "P2", "P3"])), | |
| sla_ticks=(r.randint(6, 24) if urgent else r.randint(72, 360)), | |
| est_effort_min=(r.randint(90, 300) if urgent else r.randint(45, 220)), | |
| value_usd=float(r.randint(200, 2500) if urgent else r.randint(50, 500)), | |
| urgent=urgent, | |
| ) | |
| w.events.emit(w.step, "TASK_CREATED", payload={"task": t.task_id, "title": t.title, "type": t.task_type, "priority": t.priority}, state_obj_for_hash={"task": asdict(t)}) | |
| if urgent: | |
| place_incident_tile(w, r) | |
| # Outage event | |
| if (not w.outage_active) and (r.random() < cfg.outage_probability_per_day): | |
| w.outage_active = True | |
| w.outage_timer_ticks = r.randint(ticks_per_day // 2, ticks_per_day * 2) | |
| w.ui_events.append(f"t={w.step}: OUTAGE started ({w.outage_timer_ticks} ticks)") | |
| w.events.emit(w.step, "OUTAGE_STARTED", payload={"duration_ticks": w.outage_timer_ticks}) | |
| # Add some new work nodes for visuals | |
| sprinkle_new_work_nodes(w, r, count=2) | |
| # ----------------------------- | |
| # Agent policy: Planner -> Workers -> Reviewer | |
| # ----------------------------- | |
| def agent_pick_target_tile(w: World, agent: Agent) -> Tuple[int, int]: | |
| """ | |
| Planner goes to WORK/INCIDENT; Workers go to WORK; Reviewer hovers. | |
| """ | |
| candidates = [] | |
| for y in range(1, GRID_H - 1): | |
| for x in range(1, GRID_W - 1): | |
| if agent.name == "Planner" and w.grid[y][x] in (WORK, INCIDENT): | |
| candidates.append((x, y)) | |
| elif agent.role == "Worker" and w.grid[y][x] == WORK: | |
| candidates.append((x, y)) | |
| elif agent.name == "Ops" and w.grid[y][x] in (RISK, INCIDENT): | |
| candidates.append((x, y)) | |
| if not candidates: | |
| return (agent.x, agent.y) | |
| candidates.sort(key=lambda p: manhattan((agent.x, agent.y), p)) | |
| return candidates[0] | |
| def move_agent_step(w: World, agent: Agent, tx: int, ty: int): | |
| nxt = bfs_next_step(w.grid, (agent.x, agent.y), (tx, ty)) | |
| if nxt is None: | |
| return | |
| nx, ny = nxt | |
| agent.ori = face_towards(agent.ori, agent.x, agent.y, nx, ny) | |
| agent.x, agent.y = nx, ny | |
| def maybe_start_task(w: World, agent: Agent): | |
| if agent.current_task_id: | |
| return | |
| # Only Planner and Workers start tasks; Reviewer reviews after work is done | |
| if agent.name in ("Reviewer",): | |
| return | |
| t = w.tasks.pick_next_task(w.step) | |
| if not t: | |
| return | |
| t.status = "IN_PROGRESS" | |
| t.owner = agent.name | |
| t.started_t = w.step | |
| t.attempts += 1 | |
| agent.current_task_id = t.task_id | |
| w.ui_events.append(f"t={w.step}: {agent.name} started {t.task_id} ({t.priority}) {t.title}") | |
| w.events.emit(w.step, "TASK_STARTED", agent_id=agent.name, role=agent.role, model_key=agent.model_key, payload={"task": t.task_id, "priority": t.priority, "type": t.task_type, "attempt": t.attempts}, state_obj_for_hash={"task": asdict(t)}) | |
| def maybe_complete_task(w: World, agent: Agent): | |
| if not agent.current_task_id: | |
| return | |
| tid = agent.current_task_id | |
| t = w.tasks.tasks.get(tid) | |
| if not t: | |
| agent.current_task_id = None | |
| return | |
| # Work happens when agent reaches a WORK/INCIDENT tile (narrative hook) | |
| tile = w.grid[agent.y][agent.x] | |
| if tile not in (WORK, INCIDENT, RISK): | |
| return | |
| # Simulate an LLM call + tools (or replace with real calls later) | |
| llm = simulate_llm_work(w, agent, t) | |
| w.ledger.mark_llm_call() | |
| w.ledger.add_latency(llm["latency_ms"]) | |
| w.ledger.add_tokens(llm["tokens"]) | |
| w.ledger.add_cost(llm["cost"]["usd"], agent_id=agent.name, model_key=agent.model_key) | |
| for _ in range(int(llm["tool_calls"])): | |
| w.ledger.mark_tool_call() | |
| w.events.emit( | |
| w.step, | |
| "LLM_CALL", | |
| agent_id=agent.name, | |
| role=agent.role, | |
| model_key=agent.model_key, | |
| payload={"task": t.task_id, "task_type": t.task_type, "attempt": t.attempts, "focus": agent.focus, "outage": w.outage_active}, | |
| latency_ms=llm["latency_ms"], | |
| tokens=llm["tokens"], | |
| cost=llm["cost"], | |
| state_obj_for_hash={"ledger": {"spend": w.ledger.spend_usd, "calls": w.ledger.llm_calls}}, | |
| ) | |
| # Rework probability: base * complexity-ish * outage * focus multiplier | |
| cfg = w.config | |
| base = cfg.rework_probability_base | |
| complexity = 1.0 + min(2.0, (t.est_effort_min / 180.0)) | |
| outage_mult = 1.35 if w.outage_active else 1.0 | |
| p_rework = clamp(base * complexity * outage_mult * llm["rework_mult"], 0.01, 0.65) | |
| # Reviewer reduces rework on higher risk tasks by adding an extra pass | |
| needs_review = (t.task_type in ("SEC_REVIEW", "INCIDENT")) or (t.priority in ("P0", "P1")) | |
| # Budget enforcement | |
| w.ledger.check_budget(cfg.budget_usd_soft, cfg.budget_usd_hard) | |
| if w.ledger.spend_usd >= cfg.budget_usd_hard: | |
| # Budget hard stop: complete with degraded quality (rework likely) | |
| p_rework = min(0.85, p_rework * 1.65) | |
| w.events.emit(w.step, "BUDGET_HARD_STOP", payload={"spend_usd": w.ledger.spend_usd}) | |
| # Decide status | |
| r = rng(w.seed ^ (w.step * 1315423911) ^ (hash(agent.name) & 0xFFFFFFFF)) | |
| rework = r.random() < p_rework | |
| if needs_review: | |
| # Mark as awaiting review (Reviewer can "confirm" next) | |
| t.status = "BLOCKED" | |
| t.notes["awaiting_review"] = True | |
| w.ui_events.append(f"t={w.step}: {t.task_id} awaiting review") | |
| w.events.emit(w.step, "TASK_BLOCKED", agent_id=agent.name, payload={"task": t.task_id, "reason": "awaiting_review"}) | |
| else: | |
| if rework: | |
| t.status = "REWORK" | |
| w.ui_events.append(f"t={w.step}: {t.task_id} needs REWORK") | |
| w.events.emit(w.step, "TASK_REWORK", agent_id=agent.name, payload={"task": t.task_id, "p_rework": p_rework}) | |
| else: | |
| t.status = "DONE" | |
| t.completed_t = w.step | |
| w.ui_events.append(f"t={w.step}: {t.task_id} DONE ✅") | |
| w.events.emit(w.step, "TASK_COMPLETED", agent_id=agent.name, payload={"task": t.task_id, "value_usd": t.value_usd}) | |
| agent.current_task_id = None | |
| # Clear tile to show "work node consumed" | |
| if tile == WORK: | |
| w.grid[agent.y][agent.x] = EMPTY | |
| elif tile == INCIDENT: | |
| w.grid[agent.y][agent.x] = RISK | |
| def reviewer_pass(w: World, reviewer: Agent): | |
| # Reviewer looks for blocked tasks awaiting review and finalizes them with a smaller LLM call | |
| blocked = [t for t in w.tasks.tasks.values() if t.status == "BLOCKED" and t.notes.get("awaiting_review")] | |
| if not blocked: | |
| return | |
| blocked.sort(key=lambda t: (t.priority != "P0", t.sla_due_t)) | |
| t = blocked[0] | |
| # Reviewer "reviews" without moving dependence (simple) | |
| llm = simulate_llm_work(w, reviewer, t) | |
| # Reviewer tends to be higher quality; reduce p_rework | |
| llm["tokens"]["prompt_tokens"] = int(llm["tokens"]["prompt_tokens"] * 0.65) | |
| llm["tokens"]["completion_tokens"] = int(llm["tokens"]["completion_tokens"] * 0.55) | |
| llm["latency_ms"] = int(llm["latency_ms"] * 0.9) | |
| w.ledger.mark_llm_call() | |
| w.ledger.add_latency(llm["latency_ms"]) | |
| w.ledger.add_tokens(llm["tokens"]) | |
| w.ledger.add_cost(llm["cost"]["usd"], agent_id=reviewer.name, model_key=reviewer.model_key) | |
| w.events.emit( | |
| w.step, | |
| "REVIEW_PASS", | |
| agent_id=reviewer.name, | |
| role=reviewer.role, | |
| model_key=reviewer.model_key, | |
| payload={"task": t.task_id, "task_type": t.task_type}, | |
| latency_ms=llm["latency_ms"], | |
| tokens=llm["tokens"], | |
| cost=llm["cost"], | |
| ) | |
| # Determine whether task goes to rework or done (review reduces rework heavily) | |
| cfg = w.config | |
| base = cfg.rework_probability_base * 0.55 | |
| complexity = 1.0 + min(2.0, (t.est_effort_min / 180.0)) | |
| outage_mult = 1.25 if w.outage_active else 1.0 | |
| p_rework = clamp(base * complexity * outage_mult, 0.01, 0.45) | |
| r = rng(w.seed ^ (w.step * 2654435761) ^ 0xA5A5A5A5) | |
| rework = r.random() < p_rework | |
| if rework: | |
| t.status = "REWORK" | |
| t.notes["awaiting_review"] = False | |
| w.ui_events.append(f"t={w.step}: REVIEW → {t.task_id} REWORK") | |
| w.events.emit(w.step, "TASK_REWORK", agent_id=reviewer.name, payload={"task": t.task_id, "p_rework": p_rework}) | |
| else: | |
| t.status = "DONE" | |
| t.completed_t = w.step | |
| t.notes["awaiting_review"] = False | |
| w.ui_events.append(f"t={w.step}: REVIEW → {t.task_id} DONE ✅") | |
| w.events.emit(w.step, "TASK_COMPLETED", agent_id=reviewer.name, payload={"task": t.task_id, "value_usd": t.value_usd}) | |
| # ----------------------------- | |
| # Tick | |
| # ----------------------------- | |
| def tick(w: World): | |
| if w.done: | |
| return | |
| # Daily injections | |
| daily_injections(w) | |
| # Outage timer | |
| if w.outage_active: | |
| w.outage_timer_ticks -= 1 | |
| if w.outage_timer_ticks <= 0: | |
| w.outage_active = False | |
| w.ui_events.append(f"t={w.step}: OUTAGE ended") | |
| w.events.emit(w.step, "OUTAGE_ENDED", payload={}) | |
| # Agents move + start/complete work | |
| for nm, a in w.agents.items(): | |
| if nm == "Reviewer": | |
| continue | |
| target = agent_pick_target_tile(w, a) | |
| move_agent_step(w, a, target[0], target[1]) | |
| maybe_start_task(w, a) | |
| maybe_complete_task(w, a) | |
| # Reviewer pass | |
| reviewer_pass(w, w.agents["Reviewer"]) | |
| # Camera cuts | |
| if w.auto_camera: | |
| # Focus the POV on whoever is currently holding a task or nearest incident | |
| best = "Planner" | |
| best_score = -1e9 | |
| inc_locs = [(x,y) for y in range(GRID_H) for x in range(GRID_W) if w.grid[y][x] == INCIDENT] | |
| for nm, a in w.agents.items(): | |
| score = 0.0 | |
| if a.current_task_id: | |
| score += 3.0 | |
| if inc_locs: | |
| d = min(manhattan((a.x,a.y), p) for p in inc_locs) | |
| score += max(0, 12 - d) * 0.25 | |
| score += (1.2 if nm == "Planner" else 0.0) | |
| if score > best_score: | |
| best_score = score | |
| best = nm | |
| w.pov = best | |
| # Budget alert events | |
| for msg in w.ledger.alerts[-2:]: | |
| if "EXCEEDED" in msg: | |
| w.events.emit(w.step, "BUDGET_ALERT", payload={"message": msg, "spend_usd": w.ledger.spend_usd}) | |
| # Stop conditions (user can run "years"—don’t auto-stop unless you want) | |
| # Here we keep it running, but you can stop on hard budget if desired: | |
| if w.ledger.spend_usd >= w.config.budget_usd_hard * 1.5: | |
| w.done = True | |
| w.outcome = "stopped_budget" | |
| w.events.emit(w.step, "RUN_STOPPED", payload={"reason": "spend_guardrail", "spend_usd": w.ledger.spend_usd}) | |
| w.ui_events.append(f"t={w.step}: RUN STOPPED (spend guardrail)") | |
| w.step += 1 | |
| # prune UI log | |
| if len(w.ui_events) > 220: | |
| w.ui_events = w.ui_events[-220:] | |
| # ----------------------------- | |
| # POV renderer (lightweight raycast, adapted) | |
| # ----------------------------- | |
| SKY = np.array([12, 14, 26], dtype=np.uint8) | |
| FLOOR1 = np.array([24, 28, 54], dtype=np.uint8) | |
| FLOOR2 = np.array([10, 12, 22], dtype=np.uint8) | |
| WALL1 = np.array([205, 210, 232], dtype=np.uint8) | |
| WALL2 = np.array([160, 168, 195], dtype=np.uint8) | |
| GATEC = np.array([120, 220, 255], dtype=np.uint8) | |
| WORKC = np.array([154, 176, 255], dtype=np.uint8) | |
| INCC = np.array([255, 209, 122], dtype=np.uint8) | |
| RISKC = np.array([255, 59, 59], dtype=np.uint8) | |
| def within_fov(ax: int, ay: int, ori: int, tx: int, ty: int, fov_deg: float = FOV_DEG) -> bool: | |
| dx = tx - ax | |
| dy = ty - ay | |
| if dx == 0 and dy == 0: | |
| return True | |
| ang = (math.degrees(math.atan2(dy, dx)) % 360) | |
| facing = ORI_DEG[ori] | |
| diff = (ang - facing + 540) % 360 - 180 | |
| return abs(diff) <= (fov_deg / 2) | |
| def bresenham_los(grid: List[List[int]], x0: int, y0: int, x1: int, y1: int) -> bool: | |
| dx = abs(x1 - x0) | |
| dy = abs(y1 - y0) | |
| sx = 1 if x0 < x1 else -1 | |
| sy = 1 if y0 < y1 else -1 | |
| err = dx - dy | |
| x, y = x0, y0 | |
| while True: | |
| if (x, y) != (x0, y0) and (x, y) != (x1, y1): | |
| if grid[y][x] == WALL: | |
| return False | |
| if x == x1 and y == y1: | |
| return True | |
| e2 = 2 * err | |
| if e2 > -dy: | |
| err -= dy | |
| x += sx | |
| if e2 < dx: | |
| err += dx | |
| y += sy | |
| def raycast_pov(w: World, who: str) -> np.ndarray: | |
| a = w.agents[who] | |
| img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8) | |
| img[:, :] = SKY | |
| for y in range(VIEW_H // 2, VIEW_H): | |
| t = (y - VIEW_H // 2) / max(1, (VIEW_H // 2)) | |
| col = (1 - t) * FLOOR1 + t * FLOOR2 | |
| img[y, :] = col.astype(np.uint8) | |
| ray_cols = VIEW_W | |
| half = math.radians(FOV_DEG / 2) | |
| base = math.radians(ORI_DEG[a.ori]) | |
| for rx in range(ray_cols): | |
| cam = (2 * rx / (ray_cols - 1)) - 1 | |
| ang = base + cam * half | |
| sin_a = math.sin(ang) | |
| cos_a = math.cos(ang) | |
| ox, oy = a.x + 0.5, a.y + 0.5 | |
| depth = 0.0 | |
| hit = None | |
| side = 0 | |
| hit_tile = None | |
| while depth < MAX_DEPTH: | |
| depth += 0.06 | |
| tx = int(ox + cos_a * depth) | |
| ty = int(oy + sin_a * depth) | |
| if not in_bounds(tx, ty): | |
| break | |
| tile = w.grid[ty][tx] | |
| if tile in (WALL, GATE, WORK, INCIDENT, RISK): | |
| hit = "tile" | |
| hit_tile = tile | |
| side = 1 if abs(cos_a) > abs(sin_a) else 0 | |
| break | |
| if hit is None: | |
| continue | |
| depth *= math.cos(ang - base) | |
| depth = max(depth, 0.001) | |
| h = int((VIEW_H * 0.92) / depth) | |
| y0 = max(0, VIEW_H // 2 - h // 2) | |
| y1 = min(VIEW_H - 1, VIEW_H // 2 + h // 2) | |
| if hit_tile == GATE: | |
| col = GATEC.copy() | |
| elif hit_tile == WORK: | |
| col = WORKC.copy() | |
| elif hit_tile == INCIDENT: | |
| col = INCC.copy() | |
| elif hit_tile == RISK: | |
| col = RISKC.copy() | |
| else: | |
| col = (WALL1.copy() if side == 0 else WALL2.copy()) | |
| dim = max(0.28, 1.0 - depth / MAX_DEPTH) | |
| col = (col * dim).astype(np.uint8) | |
| img[y0:y1, rx:rx + 1] = col | |
| # agent sprites | |
| for nm, other in w.agents.items(): | |
| if nm == who: | |
| continue | |
| if not within_fov(a.x, a.y, a.ori, other.x, other.y): | |
| continue | |
| if not bresenham_los(w.grid, a.x, a.y, other.x, other.y): | |
| continue | |
| dx = other.x - a.x | |
| dy = other.y - a.y | |
| ang = math.degrees(math.atan2(dy, dx)) % 360 | |
| facing = ORI_DEG[a.ori] | |
| diff = (ang - facing + 540) % 360 - 180 | |
| sx = int((diff / (FOV_DEG / 2)) * (VIEW_W / 2) + (VIEW_W / 2)) | |
| dist = math.sqrt(dx * dx + dy * dy) | |
| size = int((VIEW_H * 0.55) / max(dist, 1.0)) | |
| size = clamp(size, 10, 110) | |
| ymid = VIEW_H // 2 | |
| x0 = clamp(sx - size // 4, 0, VIEW_W - 1) | |
| x1 = clamp(sx + size // 4, 0, VIEW_W - 1) | |
| y0 = clamp(ymid - size // 2, 0, VIEW_H - 1) | |
| y1 = clamp(ymid + size // 2, 0, VIEW_H - 1) | |
| hexcol = AGENT_COLORS.get(nm, "#ffd17a").lstrip("#") | |
| rgb = np.array([int(hexcol[i:i+2], 16) for i in (0, 2, 4)], dtype=np.uint8) | |
| img[y0:y1, x0:x1] = rgb | |
| if w.overlay: | |
| cx, cy = VIEW_W // 2, VIEW_H // 2 | |
| img[cy - 1:cy + 2, cx - 16:cx + 16] = np.array([110, 210, 255], dtype=np.uint8) | |
| img[cy - 16:cy + 16, cx - 1:cx + 2] = np.array([110, 210, 255], dtype=np.uint8) | |
| return img | |
| # ----------------------------- | |
| # SVG renderer | |
| # ----------------------------- | |
| def tile_color(tile: int) -> str: | |
| return { | |
| EMPTY: COL_EMPTY, | |
| WALL: COL_WALL, | |
| WORK: COL_WORK, | |
| RISK: COL_RISK, | |
| INCIDENT: COL_INC, | |
| GATE: COL_GATE, | |
| }.get(tile, COL_EMPTY) | |
| def svg_render(w: World) -> str: | |
| # HUD headline: business clock | |
| cfg = w.config | |
| minutes = w.step * cfg.minutes_per_tick | |
| days = minutes / (24 * 60) | |
| headline = f"ZEN Orchestrator Sandbox • day={days:.2f} • tick={w.step} • outage={w.outage_active}" | |
| detail = f"spend=${w.ledger.spend_usd:.4f} • llm_calls={w.ledger.llm_calls} • tool_calls={w.ledger.tool_calls} • overdue={len(w.tasks.overdue_tasks(w.step))}" | |
| css = f""" | |
| <style> | |
| .root {{ | |
| background: {COL_BG}; | |
| border-radius: 18px; | |
| overflow: hidden; | |
| box-shadow: 0 18px 40px rgba(0,0,0,0.45); | |
| }} | |
| .hud {{ | |
| font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial; | |
| fill: rgba(235,240,255,0.92); | |
| }} | |
| .hudSmall {{ fill: rgba(235,240,255,0.72); }} | |
| .tile {{ shape-rendering: crispEdges; }} | |
| .gridline {{ stroke: {COL_GRIDLINE}; stroke-width: 1; opacity: 0.45; }} | |
| .agent {{ | |
| transition: transform 220ms cubic-bezier(.2,.8,.2,1); | |
| filter: drop-shadow(0px 8px 10px rgba(0,0,0,0.45)); | |
| }} | |
| .pulse {{ | |
| animation: pulse 1.2s ease-in-out infinite; | |
| opacity: 0.22; | |
| }} | |
| @keyframes pulse {{ | |
| 0% {{ transform: scale(1.0); opacity: 0.14; }} | |
| 50% {{ transform: scale(1.15); opacity: 0.26; }} | |
| 100% {{ transform: scale(1.0); opacity: 0.14; }} | |
| }} | |
| .badge {{ | |
| fill: rgba(15,23,51,0.72); | |
| stroke: rgba(170,195,255,0.16); | |
| stroke-width: 1; | |
| }} | |
| .dead {{ opacity: 0.22; filter: none; }} | |
| .banner {{ fill: rgba(255,255,255,0.08); }} | |
| </style> | |
| """ | |
| svg = [f""" | |
| <div class="root"> | |
| {css} | |
| <svg width="{SVG_W}" height="{SVG_H}" viewBox="0 0 {SVG_W} {SVG_H}"> | |
| <rect x="0" y="0" width="{SVG_W}" height="{SVG_H}" fill="{COL_BG}"/> | |
| <rect class="banner" x="0" y="0" width="{SVG_W}" height="{HUD_H}" rx="0" ry="0"/> | |
| <text class="hud" x="18" y="28" font-size="16" font-weight="700">{headline}</text> | |
| <text class="hud hudSmall" x="18" y="50" font-size="12">{detail}</text> | |
| """] | |
| for y in range(GRID_H): | |
| for x in range(GRID_W): | |
| t = w.grid[y][x] | |
| c = tile_color(t) | |
| px = x * TILE | |
| py = HUD_H + y * TILE | |
| svg.append(f'<rect class="tile" x="{px}" y="{py}" width="{TILE}" height="{TILE}" fill="{c}"/>') | |
| for x in range(GRID_W + 1): | |
| px = x * TILE | |
| svg.append(f'<line class="gridline" x1="{px}" y1="{HUD_H}" x2="{px}" y2="{SVG_H}"/>') | |
| for y in range(GRID_H + 1): | |
| py = HUD_H + y * TILE | |
| svg.append(f'<line class="gridline" x1="0" y1="{py}" x2="{SVG_W}" y2="{py}"/>') | |
| for nm, a in w.agents.items(): | |
| px = a.x * TILE | |
| py = HUD_H + a.y * TILE | |
| col = AGENT_COLORS.get(nm, "#ffd17a") | |
| dead_cls = " dead" if a.hp <= 0 else "" | |
| svg.append(f""" | |
| <g class="agent{dead_cls}" style="transform: translate({px}px, {py}px);"> | |
| <circle class="pulse" cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.46}" fill="{col}"></circle> | |
| <circle cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.34}" fill="{col}" opacity="0.98"></circle> | |
| """) | |
| dx, dy = DIRS[a.ori] | |
| x2 = TILE/2 + dx*(TILE*0.32) | |
| y2 = TILE/2 + dy*(TILE*0.32) | |
| svg.append(f'<line x1="{TILE/2}" y1="{TILE/2}" x2="{x2}" y2="{y2}" stroke="rgba(10,10,14,0.85)" stroke-width="4" stroke-linecap="round"/>') | |
| badge_w = max(64, 10 * len(nm) * 0.62) | |
| svg.append(f'<rect class="badge" x="{TILE/2 - badge_w/2}" y="{TILE*0.05}" rx="10" width="{badge_w}" height="16"/>') | |
| task = a.current_task_id or "-" | |
| svg.append(f'<text x="{TILE/2}" y="{TILE*0.05 + 12}" text-anchor="middle" font-size="10" fill="rgba(235,240,255,0.92)" font-family="ui-sans-serif, system-ui">{nm}:{task}</text>') | |
| if nm == w.controlled: | |
| svg.append(f'<circle cx="{TILE*0.88}" cy="{TILE*0.18}" r="6" fill="rgba(110,180,255,0.95)"/>') | |
| svg.append("</g>") | |
| svg.append("</svg></div>") | |
| return "".join(svg) | |