Orchestrator / app.py
ZENLLC's picture
Update app.py
88de539 verified
import json
import math
import os
import time
import random
import tempfile
import urllib.request
import urllib.error
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional, Tuple, Any
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw
import gradio as gr
# ============================================================
# ZEN Orchestrator Sandbox — Business-grade Agent Simulation
# ============================================================
# Fixes in this regen:
# - Removes unsupported gr.Dataframe(height=...) for Gradio 5.49.1
# - Uses a scroll container via HTML/CSS around the dataframe
# ============================================================
GRID_W, GRID_H = 32, 20
TILE = 22
HUD_H = 70
SVG_W = GRID_W * TILE
SVG_H = GRID_H * TILE + HUD_H
COL_BG = "#0a1020"
COL_PANEL = "rgba(255,255,255,0.06)"
COL_GRID = "rgba(255,255,255,0.06)"
COL_TEXT = "rgba(235,240,255,0.92)"
COL_TEXT_DIM = "rgba(235,240,255,0.72)"
EMPTY = 0
WALL = 1
DESK = 2
MEETING = 3
SERVER = 4
INCIDENT = 5
TASK_NODE = 6
TILE_COL = {
EMPTY: "#162044",
WALL: "#cdd2e6",
DESK: "#7ad9ff",
MEETING: "#ffd17a",
SERVER: "#ff7ad9",
INCIDENT: "#ff3b3b",
TASK_NODE: "#7affc8",
}
AGENT_COLORS = [
"#7ad9ff", "#ff7ad9", "#ffd17a", "#7affc8",
"#ff9b6b", "#c7d2fe", "#a0ffd9", "#ffb0b0",
]
DEFAULT_MODEL_PRICING = {
"Simulated-Local": {"in": 0.00, "out": 0.00},
"gpt-4o-mini": {"in": 0.15, "out": 0.60},
"gpt-4o": {"in": 5.00, "out": 15.00},
"gpt-5": {"in": 5.00, "out": 15.00},
}
DEFAULT_OAI_BASE = "https://api.openai.com/v1"
def clamp(v, lo, hi):
return lo if v < lo else hi if v > hi else v
def make_rng(seed: int) -> random.Random:
r = random.Random()
r.seed(seed & 0xFFFFFFFF)
return r
def now_iso():
return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
def est_tokens(text: str) -> int:
if not text:
return 0
return max(1, int(len(text) / 4))
def safe_json(s: str, fallback=None):
try:
return json.loads(s)
except Exception:
return fallback
def to_csv_download(df: pd.DataFrame) -> str:
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
df.to_csv(tmp.name, index=False)
return tmp.name
@dataclass
class Task:
id: str
title: str
description: str
priority: int = 3
difficulty: int = 3
est_hours: float = 8.0
created_step: int = 0
status: str = "backlog"
assigned_to: Optional[str] = None
progress: float = 0.0
blockers: List[str] = field(default_factory=list)
@dataclass
class Agent:
name: str
model: str
key_group: str
x: int
y: int
energy: float = 100.0
role: str = "Generalist"
state: str = "idle"
current_task_id: Optional[str] = None
thoughts: str = ""
last_action: str = ""
tokens_in: int = 0
tokens_out: int = 0
cost_usd: float = 0.0
compute_s: float = 0.0
@dataclass
class World:
seed: int = 1337
step: int = 0
sim_time_hours: float = 0.0
tick_hours: float = 4.0
difficulty: int = 3
incident_rate: float = 0.07
grid: List[List[int]] = field(default_factory=list)
agents: Dict[str, Agent] = field(default_factory=dict)
tasks: Dict[str, Task] = field(default_factory=dict)
events: List[str] = field(default_factory=list)
runlog: List[Dict[str, Any]] = field(default_factory=list)
incidents_open: int = 0
incidents_resolved: int = 0
tasks_done: int = 0
done: bool = False
def build_office(seed: int) -> List[List[int]]:
r = make_rng(seed)
g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)]
for x in range(GRID_W):
g[0][x] = WALL
g[GRID_H - 1][x] = WALL
for y in range(GRID_H):
g[y][0] = WALL
g[y][GRID_W - 1] = WALL
def rect(x0, y0, x1, y1, tile):
for y in range(y0, y1 + 1):
for x in range(x0, x1 + 1):
if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1:
g[y][x] = tile
rect(2, 2, GRID_W - 3, GRID_H - 3, EMPTY)
rect(3, 3, 10, 7, MEETING)
rect(GRID_W - 11, 3, GRID_W - 4, 7, MEETING)
rect(GRID_W - 10, GRID_H - 8, GRID_W - 4, GRID_H - 3, SERVER)
for y in range(9, GRID_H - 10):
for x in range(4, GRID_W - 12):
if (x % 3 == 1) and (y % 2 == 0):
g[y][x] = DESK
nodes = [(6, GRID_H - 5), (GRID_W // 2, GRID_H // 2), (GRID_W - 14, 10)]
for (x, y) in nodes:
if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1:
g[y][x] = TASK_NODE
for _ in range(22):
x = r.randint(3, GRID_W - 4)
y = r.randint(8, GRID_H - 9)
if g[y][x] == EMPTY:
g[y][x] = WALL
return g
def random_walkable_cell(g: List[List[int]], r: random.Random) -> Tuple[int, int]:
opts = []
for y in range(1, GRID_H - 1):
for x in range(1, GRID_W - 1):
if g[y][x] in (EMPTY, DESK, MEETING, SERVER, TASK_NODE):
opts.append((x, y))
return r.choice(opts) if opts else (2, 2)
def init_world(seed: int) -> World:
seed = int(seed)
g = build_office(seed)
w = World(seed=seed, grid=g)
w.events.append(f"[{now_iso()}] Initialized office world | seed={seed}")
return w
def add_agent(w: World, name: str, model: str, key_group: str, role: str, seed_bump: int = 0):
r = make_rng(w.seed + w.step * 17 + seed_bump)
x, y = random_walkable_cell(w.grid, r)
w.agents[name] = Agent(name=name, model=model, key_group=key_group, x=x, y=y, role=role)
w.events.append(f"[t={w.step}] Agent added: {name} | model={model} | key_group={key_group} | role={role}")
def add_task(w: World, title: str, description: str, priority: int, difficulty: int, est_hours: float):
tid = f"T{len(w.tasks)+1:04d}"
w.tasks[tid] = Task(
id=tid,
title=title.strip()[:80] if title else "Untitled Task",
description=description.strip()[:400] if description else "",
priority=int(clamp(priority, 1, 5)),
difficulty=int(clamp(difficulty, 1, 5)),
est_hours=float(max(0.25, est_hours)),
created_step=w.step,
)
w.events.append(f"[t={w.step}] Task added: {tid} | p={priority} d={difficulty} est={est_hours}h | {w.tasks[tid].title}")
return tid
DIRS4 = [(1,0), (0,1), (-1,0), (0,-1)]
def in_bounds(x, y):
return 0 <= x < GRID_W and 0 <= y < GRID_H
def is_blocking(tile: int) -> bool:
return tile == WALL
def bfs_next_step(grid: List[List[int]], start: Tuple[int,int], goal: Tuple[int,int]) -> Optional[Tuple[int,int]]:
if start == goal:
return None
sx, sy = start
gx, gy = goal
q = [(sx, sy)]
prev = {(sx, sy): None}
while q:
x, y = q.pop(0)
if (x, y) == (gx, gy):
break
for dx, dy in DIRS4:
nx, ny = x + dx, y + dy
if not in_bounds(nx, ny):
continue
if is_blocking(grid[ny][nx]):
continue
if (nx, ny) not in prev:
prev[(nx, ny)] = (x, y)
q.append((nx, ny))
if (gx, gy) not in prev:
return None
cur = (gx, gy)
while prev[cur] is not None and prev[cur] != start:
cur = prev[cur]
return cur
def oai_chat_completion(base_url: str, api_key: str, model: str, messages: List[Dict[str,str]], timeout_s: int = 25) -> Dict[str, Any]:
url = base_url.rstrip("/") + "/chat/completions"
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.4,
}).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
data = resp.read().decode("utf-8", errors="replace")
return safe_json(data, fallback={"error": {"message": "Invalid JSON from provider"}})
except urllib.error.HTTPError as e:
try:
body = e.read().decode("utf-8", errors="replace")
except Exception:
body = ""
return {"error": {"message": f"HTTPError {e.code}: {body[:600]}"}}
except Exception as e:
return {"error": {"message": str(e)}}
def price_for(model_pricing: Dict[str, Dict[str,float]], model: str, tokens_in: int, tokens_out: int) -> float:
p = model_pricing.get(model) or model_pricing.get("Simulated-Local") or {"in":0.0, "out":0.0}
return (tokens_in / 1_000_000.0) * float(p.get("in", 0.0)) + (tokens_out / 1_000_000.0) * float(p.get("out", 0.0))
def choose_task_for_agent(w: World, agent: Agent) -> Optional[str]:
backlog = [t for t in w.tasks.values() if t.status in ("backlog", "blocked")]
if not backlog:
return None
backlog.sort(key=lambda t: (-t.priority, t.created_step, t.difficulty))
return backlog[0].id
def maybe_generate_incident(w: World, r: random.Random):
rate = w.incident_rate * (0.6 + 0.25 * w.difficulty)
if r.random() < rate:
x, y = random_walkable_cell(w.grid, r)
if w.grid[y][x] != WALL:
w.grid[y][x] = INCIDENT
w.incidents_open += 1
w.events.append(f"[t={w.step}] INCIDENT spawned at ({x},{y})")
add_task(
w,
title="Handle incident",
description=f"Investigate and resolve incident at grid ({x},{y}).",
priority=5,
difficulty=min(5, w.difficulty + 1),
est_hours=4.0 + 2.0 * w.difficulty
)
def incident_positions(w: World) -> List[Tuple[int,int]]:
out = []
for y in range(GRID_H):
for x in range(GRID_W):
if w.grid[y][x] == INCIDENT:
out.append((x, y))
return out
def nearest_task_node(w: World, ax: int, ay: int) -> Tuple[int,int]:
nodes = []
for y in range(GRID_H):
for x in range(GRID_W):
if w.grid[y][x] == TASK_NODE:
nodes.append((x, y))
if not nodes:
return (ax, ay)
nodes.sort(key=lambda p: abs(p[0]-ax)+abs(p[1]-ay))
return nodes[0]
def simulated_reasoning(agent: Agent, task: Task, w: World) -> Tuple[str, str, int, int, float]:
base = 0.08 + 0.04 * w.difficulty + 0.03 * task.difficulty
compute_s = clamp(base, 0.05, 0.6)
thoughts = (
f"Assess '{task.title}'. Priority={task.priority} difficulty={task.difficulty}. "
f"Plan: decompose, execute, verify, document."
)
action = f"Progressed {task.id}. Updated notes, checked blockers, validated output."
tin = est_tokens(task.title + " " + task.description) + 30
tout = est_tokens(thoughts + " " + action) + 40
return thoughts, action, tin, tout, compute_s
def api_reasoning(agent: Agent, task: Task, w: World, base_url: str, api_key: str, model: str, context_prompt: str):
t0 = time.time()
sys = (
"You are an autonomous business operations agent in a multi-agent simulation. "
"Return a JSON object with keys: thoughts, action, blockers (list), progress_delta (0..1). "
"Keep thoughts short and action concrete."
)
user = {
"simulation": {"step": w.step, "sim_time_hours": w.sim_time_hours, "difficulty": w.difficulty, "incidents_open": w.incidents_open},
"agent": {"name": agent.name, "role": agent.role, "energy": agent.energy},
"task": asdict(task),
"context": (context_prompt or "")[:1400],
}
resp = oai_chat_completion(base_url, api_key, model, [{"role":"system","content":sys},{"role":"user","content":json.dumps(user)}])
compute_s = float(time.time() - t0)
if "error" in resp:
return "", "", 0, 0, compute_s, resp["error"].get("message", "Unknown error")
content = ""
try:
content = resp["choices"][0]["message"]["content"]
except Exception:
content = ""
usage_in = None
usage_out = None
if isinstance(resp.get("usage"), dict):
usage_in = resp["usage"].get("prompt_tokens")
usage_out = resp["usage"].get("completion_tokens")
obj = safe_json(content, fallback=None)
if not isinstance(obj, dict):
thoughts = "Provider returned non-JSON; using fallback."
action = content[:400]
tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user))
tout = usage_out if isinstance(usage_out, int) else est_tokens(content)
return thoughts, action, tin, tout, compute_s, None
thoughts = str(obj.get("thoughts", ""))[:380]
action = str(obj.get("action", ""))[:420]
blockers = obj.get("blockers", [])
progress_delta = obj.get("progress_delta", 0.0)
if isinstance(blockers, list) and blockers:
task.blockers = [str(b)[:80] for b in blockers][:5]
task.status = "blocked"
else:
task.blockers = []
if task.status == "blocked":
task.status = "in_progress"
try:
task.progress = clamp(task.progress + float(progress_delta), 0.0, 1.0)
except Exception:
pass
tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user))
tout = usage_out if isinstance(usage_out, int) else est_tokens(json.dumps(obj))
return thoughts, action, tin, tout, compute_s, None
def step_agent(w: World, agent: Agent, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str):
if agent.energy <= 0:
agent.state = "blocked"
agent.last_action = "Out of energy"
return
if agent.current_task_id is None or agent.current_task_id not in w.tasks:
tid = choose_task_for_agent(w, agent)
if tid is None:
agent.state = "idle"
agent.thoughts = "No tasks available."
agent.last_action = "Standing by."
return
w.tasks[tid].assigned_to = agent.name
w.tasks[tid].status = "in_progress"
agent.current_task_id = tid
w.events.append(f"[t={w.step}] {agent.name} picked task {tid}")
task = w.tasks[agent.current_task_id]
incs = incident_positions(w)
if incs and task.priority >= 5:
incs.sort(key=lambda p: abs(p[0]-agent.x)+abs(p[1]-agent.y))
target = incs[0]
else:
target = nearest_task_node(w, agent.x, agent.y)
nxt = bfs_next_step(w.grid, (agent.x, agent.y), target)
if nxt is not None and r.random() < 0.65:
agent.x, agent.y = nxt
agent.state = "moving"
agent.last_action = f"Moved toward {target}"
agent.energy = max(0.0, agent.energy - 0.8)
return
agent.state = "working"
if agent.model == "Simulated-Local" or agent.key_group == "none":
thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w)
err = None
speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty)
task.progress = clamp(task.progress + speed, 0.0, 1.0)
if task.progress < 1.0:
task.status = "in_progress"
else:
key = (keyrings or {}).get(agent.key_group, "")
if not key:
thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w)
err = f"No key for group '{agent.key_group}', used local simulation."
speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty)
task.progress = clamp(task.progress + speed, 0.0, 1.0)
else:
thoughts, action, tin, tout, compute_s, err = api_reasoning(agent, task, w, base_url, key, agent.model, context_prompt)
if task.progress >= 1.0 and task.status != "done":
task.status = "done"
w.tasks_done += 1
w.events.append(f"[t={w.step}] DONE {task.id}: {task.title}")
agent.current_task_id = None
if "incident" in task.title.lower():
incs = incident_positions(w)
if incs:
x, y = incs[0]
w.grid[y][x] = EMPTY
w.incidents_open = max(0, w.incidents_open - 1)
w.incidents_resolved += 1
w.events.append(f"[t={w.step}] Incident resolved at ({x},{y})")
agent.thoughts = thoughts
agent.last_action = action if action else agent.last_action
agent.tokens_in += int(tin)
agent.tokens_out += int(tout)
agent.compute_s += float(compute_s)
agent.cost_usd += price_for(model_pricing, agent.model, int(tin), int(tout))
agent.energy = max(0.0, agent.energy - (0.8 + 0.15 * w.difficulty + 0.12 * task.difficulty))
w.runlog.append({
"step": w.step,
"sim_time_hours": round(w.sim_time_hours, 2),
"agent": agent.name,
"role": agent.role,
"model": agent.model,
"key_group": agent.key_group,
"task_id": task.id,
"task_status": task.status,
"task_progress": round(task.progress, 3),
"action": (agent.last_action or "")[:220],
"thoughts": (agent.thoughts or "")[:220],
"tokens_in": int(tin),
"tokens_out": int(tout),
"cost_usd": round(price_for(model_pricing, agent.model, int(tin), int(tout)), 6),
"compute_s": round(float(compute_s), 3),
"error": (err or "")[:220],
})
def tick(w: World, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str, max_log: int = 4000):
if w.done:
return
maybe_generate_incident(w, r)
agents = list(w.agents.values())
agents.sort(key=lambda a: (a.energy, a.name))
for ag in agents:
step_agent(w, ag, r, model_pricing, keyrings, base_url, context_prompt)
w.step += 1
w.sim_time_hours += float(w.tick_hours)
if len(w.events) > 250:
w.events = w.events[-250:]
if len(w.runlog) > max_log:
w.runlog = w.runlog[-max_log:]
def compute_kpis(w: World) -> Dict[str, Any]:
backlog = sum(1 for t in w.tasks.values() if t.status == "backlog")
inprog = sum(1 for t in w.tasks.values() if t.status == "in_progress")
blocked = sum(1 for t in w.tasks.values() if t.status == "blocked")
done = sum(1 for t in w.tasks.values() if t.status == "done")
total_cost = sum(a.cost_usd for a in w.agents.values())
total_tokens_in = sum(a.tokens_in for a in w.agents.values())
total_tokens_out = sum(a.tokens_out for a in w.agents.values())
total_compute = sum(a.compute_s for a in w.agents.values())
days = max(1e-6, w.sim_time_hours / 24.0)
tpd = done / days
return {
"sim_time_days": round(w.sim_time_hours / 24.0, 2),
"agents": len(w.agents),
"tasks_total": len(w.tasks),
"tasks_backlog": backlog,
"tasks_in_progress": inprog,
"tasks_blocked": blocked,
"tasks_done": done,
"incidents_open": w.incidents_open,
"incidents_resolved": w.incidents_resolved,
"throughput_tasks_per_day": round(tpd, 3),
"tokens_in_total": int(total_tokens_in),
"tokens_out_total": int(total_tokens_out),
"compute_s_total": round(total_compute, 2),
"cost_usd_total": round(total_cost, 4),
}
def svg_render(w: World) -> str:
k = compute_kpis(w)
headline = (
f"ZEN Orchestrator Sandbox • step={w.step} • sim_days={k['sim_time_days']} • "
f"agents={k['agents']} • done={k['tasks_done']} • backlog={k['tasks_backlog']} • "
f"incidents_open={k['incidents_open']} • cost=${k['cost_usd_total']}"
)
detail = f"tick_hours={w.tick_hours} • difficulty={w.difficulty} • incident_rate={round(w.incident_rate,3)}"
css = f"""
<style>
.root {{
background: {COL_BG};
border-radius: 18px;
overflow: hidden;
box-shadow: 0 18px 40px rgba(0,0,0,0.45);
}}
.hud {{
font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial;
fill: {COL_TEXT};
}}
.hudSmall {{
fill: {COL_TEXT_DIM};
}}
.gridline {{
stroke: {COL_GRID};
stroke-width: 1;
}}
.agent {{
transition: transform 220ms cubic-bezier(.2,.8,.2,1);
filter: drop-shadow(0px 10px 12px rgba(0,0,0,0.45));
}}
.pulse {{
animation: pulse 1.2s ease-in-out infinite;
opacity: 0.22;
}}
@keyframes pulse {{
0% {{ transform: scale(1.0); opacity: 0.14; }}
50% {{ transform: scale(1.15); opacity: 0.26; }}
100% {{ transform: scale(1.0); opacity: 0.14; }}
}}
.tag {{
fill: rgba(0,0,0,0.38);
}}
.tile {{
shape-rendering: crispEdges;
}}
</style>
"""
parts = [f"""
<div class="root">
{css}
<svg width="{SVG_W}" height="{SVG_H}" viewBox="0 0 {SVG_W} {SVG_H}">
<rect x="0" y="0" width="{SVG_W}" height="{SVG_H}" fill="{COL_BG}"/>
<rect x="0" y="0" width="{SVG_W}" height="{HUD_H}" fill="{COL_PANEL}"/>
<text class="hud" x="16" y="28" font-size="16" font-weight="800">{headline}</text>
<text class="hud hudSmall" x="16" y="52" font-size="12">{detail}</text>
"""]
for y in range(GRID_H):
for x in range(GRID_W):
t = w.grid[y][x]
col = TILE_COL.get(t, TILE_COL[EMPTY])
px = x * TILE
py = HUD_H + y * TILE
parts.append(f'<rect class="tile" x="{px}" y="{py}" width="{TILE}" height="{TILE}" fill="{col}"/>')
if t == INCIDENT:
parts.append(f'<circle cx="{px+TILE/2}" cy="{py+TILE/2}" r="7" fill="rgba(0,0,0,0.25)"/>')
parts.append(f'<circle cx="{px+TILE/2}" cy="{py+TILE/2}" r="4" fill="rgba(255,255,255,0.65)"/>')
for x in range(GRID_W + 1):
px = x * TILE
parts.append(f'<line class="gridline" x1="{px}" y1="{HUD_H}" x2="{px}" y2="{SVG_H}"/>')
for y in range(GRID_H + 1):
py = HUD_H + y * TILE
parts.append(f'<line class="gridline" x1="0" y1="{py}" x2="{SVG_W}" y2="{py}"/>')
for i, ag in enumerate(w.agents.values()):
col = AGENT_COLORS[i % len(AGENT_COLORS)]
px = ag.x * TILE
py = HUD_H + ag.y * TILE
parts.append(f"""
<g class="agent" style="transform: translate({px}px, {py}px);">
<circle class="pulse" cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.48}" fill="{col}"></circle>
<circle cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.34}" fill="{col}" opacity="0.98"></circle>
<rect class="tag" x="{TILE*0.10}" y="{TILE*0.08}" width="{TILE*0.80}" height="14" rx="7"/>
<text x="{TILE/2}" y="{TILE*0.08 + 11}" text-anchor="middle" font-size="9"
fill="rgba(235,240,255,0.90)" font-family="ui-sans-serif, system-ui">{ag.name}</text>
""")
bar_w = TILE * 0.80
bx = TILE/2 - bar_w/2
by = TILE * 0.82
parts.append(f'<rect x="{bx}" y="{by}" width="{bar_w}" height="6" rx="4" fill="rgba(255,255,255,0.12)"/>')
parts.append(f'<rect x="{bx}" y="{by}" width="{bar_w*(clamp(ag.energy,0,100)/100.0)}" height="6" rx="4" fill="rgba(122,255,200,0.85)"/>')
parts.append("</g>")
parts.append("</svg></div>")
return "".join(parts)
def agents_text(w: World) -> str:
lines = []
for ag in w.agents.values():
lines.append(
f"{ag.name} | role={ag.role} | model={ag.model} | key_group={ag.key_group} | "
f"state={ag.state} | energy={round(ag.energy,1)} | "
f"task={ag.current_task_id or '-'} | cost=${round(ag.cost_usd,4)} | "
f"tin={ag.tokens_in} tout={ag.tokens_out}"
)
return "\n".join(lines) if lines else "(no agents yet)"
def tasks_text(w: World) -> str:
tasks = list(w.tasks.values())
tasks.sort(key=lambda t: (t.status != "done", -t.priority, t.created_step))
out = []
for t in tasks[:18]:
bl = f" blockers={t.blockers}" if t.blockers else ""
out.append(
f"{t.id} [{t.status}] p={t.priority} d={t.difficulty} prog={round(t.progress,2)} "
f"est={t.est_hours}h assigned={t.assigned_to or '-'} :: {t.title}{bl}"
)
return "\n".join(out) if out else "(no tasks yet)"
def events_text(w: World) -> str:
return "\n".join(w.events[-20:]) if w.events else ""
def kpis_text(w: World) -> str:
return json.dumps(compute_kpis(w), indent=2)
def run_data_df(w: World, rows: int) -> pd.DataFrame:
rows = int(max(10, rows))
if not w.runlog:
return pd.DataFrame(columns=[
"step","sim_time_hours","agent","role","model","key_group","task_id","task_status","task_progress",
"action","thoughts","tokens_in","tokens_out","cost_usd","compute_s","error"
])
return pd.DataFrame(w.runlog[-rows:])
def ui_refresh(w: World, run_rows: int):
return (
svg_render(w),
agents_text(w),
tasks_text(w),
events_text(w),
kpis_text(w),
run_data_df(w, run_rows),
)
TITLE = "ZEN Orchestrator Sandbox — Business-grade Agent Orchestra Simulator"
with gr.Blocks(title=TITLE) as demo:
gr.Markdown(
f"## {TITLE}\n"
"Business-oriented multi-agent simulation with game-like visuals, time controls, run logging, and optional model keys."
)
# CSS scroll wrapper for the dataframe (Gradio 5.49.1-safe)
gr.HTML("""
<style>
.zen-scroll {
max-height: 320px;
overflow: auto;
border-radius: 12px;
border: 1px solid rgba(255,255,255,0.10);
background: rgba(255,255,255,0.03);
padding: 10px;
}
</style>
""")
w_state = gr.State(init_world(1337))
autoplay_on = gr.State(False)
keyrings_state = gr.State({"none": ""})
pricing_state = gr.State(DEFAULT_MODEL_PRICING)
with gr.Row():
arena = gr.HTML(label="Office World (Animated SVG)")
with gr.Column(scale=1):
agents_box = gr.Textbox(label="Agents", lines=10)
tasks_box = gr.Textbox(label="Task Backlog", lines=10)
with gr.Row():
events_box = gr.Textbox(label="Event Log", lines=8)
kpi_box = gr.Textbox(label="KPIs (JSON)", lines=8)
gr.Markdown("### Run Data (scroll + download)")
with gr.Row():
runlog_rows = gr.Slider(50, 1500, value=250, step=50, label="Run Data rows to display")
download_btn = gr.Button("Download Run Data CSV")
# Scroll wrapper around dataframe
gr.HTML('<div class="zen-scroll">')
run_data = gr.Dataframe(label="Run Data", interactive=False, wrap=True) # ✅ no height kwarg
gr.HTML('</div>')
download_file = gr.File(label="CSV Download", interactive=False)
gr.Markdown("### Scenario + Orchestration Controls")
with gr.Row():
seed_in = gr.Number(value=1337, precision=0, label="Seed")
tick_hours = gr.Slider(0.5, 168.0, value=4.0, step=0.5, label="Simulated hours per tick")
difficulty = gr.Slider(1, 5, value=3, step=1, label="Global difficulty")
incident_rate = gr.Slider(0.0, 0.35, value=0.07, step=0.01, label="Incident rate per tick")
with gr.Row():
btn_reset = gr.Button("Reset World")
run_n = gr.Number(value=10, precision=0, label="Run N ticks")
btn_run = gr.Button("Run")
gr.Markdown("### Add Tasks")
with gr.Row():
task_title = gr.Textbox(label="Task title", value="Build onboarding automation")
task_desc = gr.Textbox(label="Task description", value="Design an end-to-end flow: intake, validation, audit log, export.")
with gr.Row():
task_p = gr.Slider(1, 5, value=4, step=1, label="Priority")
task_d = gr.Slider(1, 5, value=3, step=1, label="Difficulty")
task_est = gr.Slider(0.25, 200.0, value=16.0, step=0.25, label="Estimated hours")
btn_add_task = gr.Button("Add Task")
gr.Markdown("### Add Agents")
with gr.Row():
agent_name = gr.Textbox(label="Agent name", value="Agent-01")
agent_role = gr.Dropdown(
choices=["Generalist", "Ops", "HR Automation", "Engineer", "Analyst", "Incident Response", "PM"],
value="Engineer",
label="Role",
)
with gr.Row():
model_choice = gr.Dropdown(
choices=["Simulated-Local", "gpt-4o-mini", "gpt-4o", "gpt-5"],
value="Simulated-Local",
label="Model",
)
key_group = gr.Dropdown(
choices=["none", "key1", "key2", "key3"],
value="none",
label="Key group",
)
btn_add_agent = gr.Button("Add Agent")
gr.Markdown("### Model Keys + Pricing (Optional)")
with gr.Row():
oai_base = gr.Textbox(label="OpenAI-compatible base URL", value=DEFAULT_OAI_BASE)
context_prompt = gr.Textbox(label="Global Context Prompt", value="Simulate a business ops team with auditability and cost tracking.", lines=3)
with gr.Row():
key1 = gr.Textbox(label="API Key (key1)", type="password")
key2 = gr.Textbox(label="API Key (key2)", type="password")
key3 = gr.Textbox(label="API Key (key3)", type="password")
pricing_json = gr.Textbox(label="Model pricing JSON (USD per 1M tokens)", value=json.dumps(DEFAULT_MODEL_PRICING, indent=2), lines=8)
btn_apply_keys = gr.Button("Apply Keys + Pricing")
gr.Markdown("### Autoplay")
autoplay_speed = gr.Slider(0.05, 1.2, value=0.18, step=0.01, label="Autoplay interval (seconds)")
with gr.Row():
btn_play = gr.Button("▶ Start Autoplay")
btn_pause = gr.Button("⏸ Stop Autoplay")
timer = gr.Timer(value=0.18, active=False)
def do_reset(seed: int, rows: int):
w = init_world(int(seed))
return (*ui_refresh(w, rows), w)
btn_reset.click(
do_reset,
inputs=[seed_in, runlog_rows],
outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state],
queue=True,
)
def add_task_clicked(w: World, rows: int, title: str, desc: str, p: int, d: int, est: float):
add_task(w, title, desc, p, d, est)
return (*ui_refresh(w, rows), w)
btn_add_task.click(
add_task_clicked,
inputs=[w_state, runlog_rows, task_title, task_desc, task_p, task_d, task_est],
outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state],
queue=True,
)
def add_agent_clicked(w: World, rows: int, name: str, role: str, model: str, kg: str):
name = (name or "").strip() or f"Agent-{len(w.agents)+1:02d}"
if name in w.agents:
name = f"{name}-{len(w.agents)+1}"
add_agent(w, name=name, model=model, key_group=kg, role=role, seed_bump=len(w.agents) * 19)
return (*ui_refresh(w, rows), w)
btn_add_agent.click(
add_agent_clicked,
inputs=[w_state, runlog_rows, agent_name, agent_role, model_choice, key_group],
outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state],
queue=True,
)
def apply_keys_pricing(keys_state: Dict[str, str], pricing_state_obj: Dict[str, Any], k1: str, k2: str, k3: str, pricing_txt: str):
keys_state = dict(keys_state) if isinstance(keys_state, dict) else {"none": ""}
keys_state["none"] = ""
if k1: keys_state["key1"] = k1.strip()
if k2: keys_state["key2"] = k2.strip()
if k3: keys_state["key3"] = k3.strip()
pj = safe_json(pricing_txt, fallback=None)
if isinstance(pj, dict):
pricing_state_obj = pj
return keys_state, pricing_state_obj
btn_apply_keys.click(
apply_keys_pricing,
inputs=[keyrings_state, pricing_state, key1, key2, key3, pricing_json],
outputs=[keyrings_state, pricing_state],
queue=True,
)
def run_clicked(w: World, rows: int, n: int, th: float, diff: int, ir: float,
keys_state: Dict[str, str], pricing_obj: Dict[str, Any], base_url: str, ctx: str):
w.tick_hours = float(th)
w.difficulty = int(diff)
w.incident_rate = float(ir)
w.events.append(f"[t={w.step}] Scenario updated: tick_hours={w.tick_hours}, difficulty={w.difficulty}, incident_rate={w.incident_rate}")
n = int(max(1, n))
r = make_rng(w.seed + w.step * 101)
for _ in range(n):
tick(w, r, pricing_obj, keys_state, base_url, ctx)
return (*ui_refresh(w, rows), w)
btn_run.click(
run_clicked,
inputs=[w_state, runlog_rows, run_n, tick_hours, difficulty, incident_rate, keyrings_state, pricing_state, oai_base, context_prompt],
outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state],
queue=True,
)
def download_run_data(w: World, rows: int):
df = run_data_df(w, rows=50000)
path = to_csv_download(df)
return path
download_btn.click(
download_run_data,
inputs=[w_state, runlog_rows],
outputs=[download_file],
queue=True,
)
def autoplay_start(interval: float):
return gr.update(value=float(interval), active=True), True
def autoplay_stop():
return gr.update(active=False), False
btn_play.click(autoplay_start, inputs=[autoplay_speed], outputs=[timer, autoplay_on], queue=True)
btn_pause.click(autoplay_stop, inputs=[], outputs=[timer, autoplay_on], queue=True)
def autoplay_tick(w: World, is_on: bool, rows: int, th: float, diff: int, ir: float,
keys_state: Dict[str, str], pricing_obj: Dict[str, Any], base_url: str, ctx: str):
if not is_on:
return (*ui_refresh(w, rows), w, is_on, gr.update())
w.tick_hours = float(th)
w.difficulty = int(diff)
w.incident_rate = float(ir)
r = make_rng(w.seed + w.step * 101)
tick(w, r, pricing_obj, keys_state, base_url, ctx)
return (*ui_refresh(w, rows), w, True, gr.update())
timer.tick(
autoplay_tick,
inputs=[w_state, autoplay_on, runlog_rows, tick_hours, difficulty, incident_rate, keyrings_state, pricing_state, oai_base, context_prompt],
outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state, autoplay_on, timer],
queue=True,
)
# initial render
def on_load(w: World, rows: int):
return (*ui_refresh(w, rows), w)
demo.load(
on_load,
inputs=[w_state, runlog_rows],
outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state],
queue=True,
)
demo.queue().launch(ssr_mode=False)