import json import math import os import time import random import tempfile import urllib.request import urllib.error from dataclasses import dataclass, field, asdict from typing import Dict, List, Optional, Tuple, Any import numpy as np import pandas as pd from PIL import Image, ImageDraw import gradio as gr # ============================================================ # ZEN Orchestrator Sandbox — Business-grade Agent Simulation # ============================================================ # Fixes in this regen: # - Removes unsupported gr.Dataframe(height=...) for Gradio 5.49.1 # - Uses a scroll container via HTML/CSS around the dataframe # ============================================================ GRID_W, GRID_H = 32, 20 TILE = 22 HUD_H = 70 SVG_W = GRID_W * TILE SVG_H = GRID_H * TILE + HUD_H COL_BG = "#0a1020" COL_PANEL = "rgba(255,255,255,0.06)" COL_GRID = "rgba(255,255,255,0.06)" COL_TEXT = "rgba(235,240,255,0.92)" COL_TEXT_DIM = "rgba(235,240,255,0.72)" EMPTY = 0 WALL = 1 DESK = 2 MEETING = 3 SERVER = 4 INCIDENT = 5 TASK_NODE = 6 TILE_COL = { EMPTY: "#162044", WALL: "#cdd2e6", DESK: "#7ad9ff", MEETING: "#ffd17a", SERVER: "#ff7ad9", INCIDENT: "#ff3b3b", TASK_NODE: "#7affc8", } AGENT_COLORS = [ "#7ad9ff", "#ff7ad9", "#ffd17a", "#7affc8", "#ff9b6b", "#c7d2fe", "#a0ffd9", "#ffb0b0", ] DEFAULT_MODEL_PRICING = { "Simulated-Local": {"in": 0.00, "out": 0.00}, "gpt-4o-mini": {"in": 0.15, "out": 0.60}, "gpt-4o": {"in": 5.00, "out": 15.00}, "gpt-5": {"in": 5.00, "out": 15.00}, } DEFAULT_OAI_BASE = "https://api.openai.com/v1" def clamp(v, lo, hi): return lo if v < lo else hi if v > hi else v def make_rng(seed: int) -> random.Random: r = random.Random() r.seed(seed & 0xFFFFFFFF) return r def now_iso(): return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) def est_tokens(text: str) -> int: if not text: return 0 return max(1, int(len(text) / 4)) def safe_json(s: str, fallback=None): try: return json.loads(s) except Exception: return fallback def to_csv_download(df: pd.DataFrame) -> str: tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") df.to_csv(tmp.name, index=False) return tmp.name @dataclass class Task: id: str title: str description: str priority: int = 3 difficulty: int = 3 est_hours: float = 8.0 created_step: int = 0 status: str = "backlog" assigned_to: Optional[str] = None progress: float = 0.0 blockers: List[str] = field(default_factory=list) @dataclass class Agent: name: str model: str key_group: str x: int y: int energy: float = 100.0 role: str = "Generalist" state: str = "idle" current_task_id: Optional[str] = None thoughts: str = "" last_action: str = "" tokens_in: int = 0 tokens_out: int = 0 cost_usd: float = 0.0 compute_s: float = 0.0 @dataclass class World: seed: int = 1337 step: int = 0 sim_time_hours: float = 0.0 tick_hours: float = 4.0 difficulty: int = 3 incident_rate: float = 0.07 grid: List[List[int]] = field(default_factory=list) agents: Dict[str, Agent] = field(default_factory=dict) tasks: Dict[str, Task] = field(default_factory=dict) events: List[str] = field(default_factory=list) runlog: List[Dict[str, Any]] = field(default_factory=list) incidents_open: int = 0 incidents_resolved: int = 0 tasks_done: int = 0 done: bool = False def build_office(seed: int) -> List[List[int]]: r = make_rng(seed) g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)] for x in range(GRID_W): g[0][x] = WALL g[GRID_H - 1][x] = WALL for y in range(GRID_H): g[y][0] = WALL g[y][GRID_W - 1] = WALL def rect(x0, y0, x1, y1, tile): for y in range(y0, y1 + 1): for x in range(x0, x1 + 1): if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1: g[y][x] = tile rect(2, 2, GRID_W - 3, GRID_H - 3, EMPTY) rect(3, 3, 10, 7, MEETING) rect(GRID_W - 11, 3, GRID_W - 4, 7, MEETING) rect(GRID_W - 10, GRID_H - 8, GRID_W - 4, GRID_H - 3, SERVER) for y in range(9, GRID_H - 10): for x in range(4, GRID_W - 12): if (x % 3 == 1) and (y % 2 == 0): g[y][x] = DESK nodes = [(6, GRID_H - 5), (GRID_W // 2, GRID_H // 2), (GRID_W - 14, 10)] for (x, y) in nodes: if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1: g[y][x] = TASK_NODE for _ in range(22): x = r.randint(3, GRID_W - 4) y = r.randint(8, GRID_H - 9) if g[y][x] == EMPTY: g[y][x] = WALL return g def random_walkable_cell(g: List[List[int]], r: random.Random) -> Tuple[int, int]: opts = [] for y in range(1, GRID_H - 1): for x in range(1, GRID_W - 1): if g[y][x] in (EMPTY, DESK, MEETING, SERVER, TASK_NODE): opts.append((x, y)) return r.choice(opts) if opts else (2, 2) def init_world(seed: int) -> World: seed = int(seed) g = build_office(seed) w = World(seed=seed, grid=g) w.events.append(f"[{now_iso()}] Initialized office world | seed={seed}") return w def add_agent(w: World, name: str, model: str, key_group: str, role: str, seed_bump: int = 0): r = make_rng(w.seed + w.step * 17 + seed_bump) x, y = random_walkable_cell(w.grid, r) w.agents[name] = Agent(name=name, model=model, key_group=key_group, x=x, y=y, role=role) w.events.append(f"[t={w.step}] Agent added: {name} | model={model} | key_group={key_group} | role={role}") def add_task(w: World, title: str, description: str, priority: int, difficulty: int, est_hours: float): tid = f"T{len(w.tasks)+1:04d}" w.tasks[tid] = Task( id=tid, title=title.strip()[:80] if title else "Untitled Task", description=description.strip()[:400] if description else "", priority=int(clamp(priority, 1, 5)), difficulty=int(clamp(difficulty, 1, 5)), est_hours=float(max(0.25, est_hours)), created_step=w.step, ) w.events.append(f"[t={w.step}] Task added: {tid} | p={priority} d={difficulty} est={est_hours}h | {w.tasks[tid].title}") return tid DIRS4 = [(1,0), (0,1), (-1,0), (0,-1)] def in_bounds(x, y): return 0 <= x < GRID_W and 0 <= y < GRID_H def is_blocking(tile: int) -> bool: return tile == WALL def bfs_next_step(grid: List[List[int]], start: Tuple[int,int], goal: Tuple[int,int]) -> Optional[Tuple[int,int]]: if start == goal: return None sx, sy = start gx, gy = goal q = [(sx, sy)] prev = {(sx, sy): None} while q: x, y = q.pop(0) if (x, y) == (gx, gy): break for dx, dy in DIRS4: nx, ny = x + dx, y + dy if not in_bounds(nx, ny): continue if is_blocking(grid[ny][nx]): continue if (nx, ny) not in prev: prev[(nx, ny)] = (x, y) q.append((nx, ny)) if (gx, gy) not in prev: return None cur = (gx, gy) while prev[cur] is not None and prev[cur] != start: cur = prev[cur] return cur def oai_chat_completion(base_url: str, api_key: str, model: str, messages: List[Dict[str,str]], timeout_s: int = 25) -> Dict[str, Any]: url = base_url.rstrip("/") + "/chat/completions" payload = json.dumps({ "model": model, "messages": messages, "temperature": 0.4, }).encode("utf-8") req = urllib.request.Request( url, data=payload, headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: data = resp.read().decode("utf-8", errors="replace") return safe_json(data, fallback={"error": {"message": "Invalid JSON from provider"}}) except urllib.error.HTTPError as e: try: body = e.read().decode("utf-8", errors="replace") except Exception: body = "" return {"error": {"message": f"HTTPError {e.code}: {body[:600]}"}} except Exception as e: return {"error": {"message": str(e)}} def price_for(model_pricing: Dict[str, Dict[str,float]], model: str, tokens_in: int, tokens_out: int) -> float: p = model_pricing.get(model) or model_pricing.get("Simulated-Local") or {"in":0.0, "out":0.0} return (tokens_in / 1_000_000.0) * float(p.get("in", 0.0)) + (tokens_out / 1_000_000.0) * float(p.get("out", 0.0)) def choose_task_for_agent(w: World, agent: Agent) -> Optional[str]: backlog = [t for t in w.tasks.values() if t.status in ("backlog", "blocked")] if not backlog: return None backlog.sort(key=lambda t: (-t.priority, t.created_step, t.difficulty)) return backlog[0].id def maybe_generate_incident(w: World, r: random.Random): rate = w.incident_rate * (0.6 + 0.25 * w.difficulty) if r.random() < rate: x, y = random_walkable_cell(w.grid, r) if w.grid[y][x] != WALL: w.grid[y][x] = INCIDENT w.incidents_open += 1 w.events.append(f"[t={w.step}] INCIDENT spawned at ({x},{y})") add_task( w, title="Handle incident", description=f"Investigate and resolve incident at grid ({x},{y}).", priority=5, difficulty=min(5, w.difficulty + 1), est_hours=4.0 + 2.0 * w.difficulty ) def incident_positions(w: World) -> List[Tuple[int,int]]: out = [] for y in range(GRID_H): for x in range(GRID_W): if w.grid[y][x] == INCIDENT: out.append((x, y)) return out def nearest_task_node(w: World, ax: int, ay: int) -> Tuple[int,int]: nodes = [] for y in range(GRID_H): for x in range(GRID_W): if w.grid[y][x] == TASK_NODE: nodes.append((x, y)) if not nodes: return (ax, ay) nodes.sort(key=lambda p: abs(p[0]-ax)+abs(p[1]-ay)) return nodes[0] def simulated_reasoning(agent: Agent, task: Task, w: World) -> Tuple[str, str, int, int, float]: base = 0.08 + 0.04 * w.difficulty + 0.03 * task.difficulty compute_s = clamp(base, 0.05, 0.6) thoughts = ( f"Assess '{task.title}'. Priority={task.priority} difficulty={task.difficulty}. " f"Plan: decompose, execute, verify, document." ) action = f"Progressed {task.id}. Updated notes, checked blockers, validated output." tin = est_tokens(task.title + " " + task.description) + 30 tout = est_tokens(thoughts + " " + action) + 40 return thoughts, action, tin, tout, compute_s def api_reasoning(agent: Agent, task: Task, w: World, base_url: str, api_key: str, model: str, context_prompt: str): t0 = time.time() sys = ( "You are an autonomous business operations agent in a multi-agent simulation. " "Return a JSON object with keys: thoughts, action, blockers (list), progress_delta (0..1). " "Keep thoughts short and action concrete." ) user = { "simulation": {"step": w.step, "sim_time_hours": w.sim_time_hours, "difficulty": w.difficulty, "incidents_open": w.incidents_open}, "agent": {"name": agent.name, "role": agent.role, "energy": agent.energy}, "task": asdict(task), "context": (context_prompt or "")[:1400], } resp = oai_chat_completion(base_url, api_key, model, [{"role":"system","content":sys},{"role":"user","content":json.dumps(user)}]) compute_s = float(time.time() - t0) if "error" in resp: return "", "", 0, 0, compute_s, resp["error"].get("message", "Unknown error") content = "" try: content = resp["choices"][0]["message"]["content"] except Exception: content = "" usage_in = None usage_out = None if isinstance(resp.get("usage"), dict): usage_in = resp["usage"].get("prompt_tokens") usage_out = resp["usage"].get("completion_tokens") obj = safe_json(content, fallback=None) if not isinstance(obj, dict): thoughts = "Provider returned non-JSON; using fallback." action = content[:400] tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user)) tout = usage_out if isinstance(usage_out, int) else est_tokens(content) return thoughts, action, tin, tout, compute_s, None thoughts = str(obj.get("thoughts", ""))[:380] action = str(obj.get("action", ""))[:420] blockers = obj.get("blockers", []) progress_delta = obj.get("progress_delta", 0.0) if isinstance(blockers, list) and blockers: task.blockers = [str(b)[:80] for b in blockers][:5] task.status = "blocked" else: task.blockers = [] if task.status == "blocked": task.status = "in_progress" try: task.progress = clamp(task.progress + float(progress_delta), 0.0, 1.0) except Exception: pass tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user)) tout = usage_out if isinstance(usage_out, int) else est_tokens(json.dumps(obj)) return thoughts, action, tin, tout, compute_s, None def step_agent(w: World, agent: Agent, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str): if agent.energy <= 0: agent.state = "blocked" agent.last_action = "Out of energy" return if agent.current_task_id is None or agent.current_task_id not in w.tasks: tid = choose_task_for_agent(w, agent) if tid is None: agent.state = "idle" agent.thoughts = "No tasks available." agent.last_action = "Standing by." return w.tasks[tid].assigned_to = agent.name w.tasks[tid].status = "in_progress" agent.current_task_id = tid w.events.append(f"[t={w.step}] {agent.name} picked task {tid}") task = w.tasks[agent.current_task_id] incs = incident_positions(w) if incs and task.priority >= 5: incs.sort(key=lambda p: abs(p[0]-agent.x)+abs(p[1]-agent.y)) target = incs[0] else: target = nearest_task_node(w, agent.x, agent.y) nxt = bfs_next_step(w.grid, (agent.x, agent.y), target) if nxt is not None and r.random() < 0.65: agent.x, agent.y = nxt agent.state = "moving" agent.last_action = f"Moved toward {target}" agent.energy = max(0.0, agent.energy - 0.8) return agent.state = "working" if agent.model == "Simulated-Local" or agent.key_group == "none": thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w) err = None speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty) task.progress = clamp(task.progress + speed, 0.0, 1.0) if task.progress < 1.0: task.status = "in_progress" else: key = (keyrings or {}).get(agent.key_group, "") if not key: thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w) err = f"No key for group '{agent.key_group}', used local simulation." speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty) task.progress = clamp(task.progress + speed, 0.0, 1.0) else: thoughts, action, tin, tout, compute_s, err = api_reasoning(agent, task, w, base_url, key, agent.model, context_prompt) if task.progress >= 1.0 and task.status != "done": task.status = "done" w.tasks_done += 1 w.events.append(f"[t={w.step}] DONE {task.id}: {task.title}") agent.current_task_id = None if "incident" in task.title.lower(): incs = incident_positions(w) if incs: x, y = incs[0] w.grid[y][x] = EMPTY w.incidents_open = max(0, w.incidents_open - 1) w.incidents_resolved += 1 w.events.append(f"[t={w.step}] Incident resolved at ({x},{y})") agent.thoughts = thoughts agent.last_action = action if action else agent.last_action agent.tokens_in += int(tin) agent.tokens_out += int(tout) agent.compute_s += float(compute_s) agent.cost_usd += price_for(model_pricing, agent.model, int(tin), int(tout)) agent.energy = max(0.0, agent.energy - (0.8 + 0.15 * w.difficulty + 0.12 * task.difficulty)) w.runlog.append({ "step": w.step, "sim_time_hours": round(w.sim_time_hours, 2), "agent": agent.name, "role": agent.role, "model": agent.model, "key_group": agent.key_group, "task_id": task.id, "task_status": task.status, "task_progress": round(task.progress, 3), "action": (agent.last_action or "")[:220], "thoughts": (agent.thoughts or "")[:220], "tokens_in": int(tin), "tokens_out": int(tout), "cost_usd": round(price_for(model_pricing, agent.model, int(tin), int(tout)), 6), "compute_s": round(float(compute_s), 3), "error": (err or "")[:220], }) def tick(w: World, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str, max_log: int = 4000): if w.done: return maybe_generate_incident(w, r) agents = list(w.agents.values()) agents.sort(key=lambda a: (a.energy, a.name)) for ag in agents: step_agent(w, ag, r, model_pricing, keyrings, base_url, context_prompt) w.step += 1 w.sim_time_hours += float(w.tick_hours) if len(w.events) > 250: w.events = w.events[-250:] if len(w.runlog) > max_log: w.runlog = w.runlog[-max_log:] def compute_kpis(w: World) -> Dict[str, Any]: backlog = sum(1 for t in w.tasks.values() if t.status == "backlog") inprog = sum(1 for t in w.tasks.values() if t.status == "in_progress") blocked = sum(1 for t in w.tasks.values() if t.status == "blocked") done = sum(1 for t in w.tasks.values() if t.status == "done") total_cost = sum(a.cost_usd for a in w.agents.values()) total_tokens_in = sum(a.tokens_in for a in w.agents.values()) total_tokens_out = sum(a.tokens_out for a in w.agents.values()) total_compute = sum(a.compute_s for a in w.agents.values()) days = max(1e-6, w.sim_time_hours / 24.0) tpd = done / days return { "sim_time_days": round(w.sim_time_hours / 24.0, 2), "agents": len(w.agents), "tasks_total": len(w.tasks), "tasks_backlog": backlog, "tasks_in_progress": inprog, "tasks_blocked": blocked, "tasks_done": done, "incidents_open": w.incidents_open, "incidents_resolved": w.incidents_resolved, "throughput_tasks_per_day": round(tpd, 3), "tokens_in_total": int(total_tokens_in), "tokens_out_total": int(total_tokens_out), "compute_s_total": round(total_compute, 2), "cost_usd_total": round(total_cost, 4), } def svg_render(w: World) -> str: k = compute_kpis(w) headline = ( f"ZEN Orchestrator Sandbox • step={w.step} • sim_days={k['sim_time_days']} • " f"agents={k['agents']} • done={k['tasks_done']} • backlog={k['tasks_backlog']} • " f"incidents_open={k['incidents_open']} • cost=${k['cost_usd_total']}" ) detail = f"tick_hours={w.tick_hours} • difficulty={w.difficulty} • incident_rate={round(w.incident_rate,3)}" css = f""" """ parts = [f"""