import json import math import os import time import random import tempfile import urllib.request import urllib.error from dataclasses import dataclass, field, asdict from typing import Dict, List, Optional, Tuple, Any import numpy as np import pandas as pd from PIL import Image, ImageDraw import gradio as gr # ============================================================ # ZEN Orchestrator Sandbox — Business-grade Agent Simulation # ============================================================ # Fixes in this regen: # - Removes unsupported gr.Dataframe(height=...) for Gradio 5.49.1 # - Uses a scroll container via HTML/CSS around the dataframe # ============================================================ GRID_W, GRID_H = 32, 20 TILE = 22 HUD_H = 70 SVG_W = GRID_W * TILE SVG_H = GRID_H * TILE + HUD_H COL_BG = "#0a1020" COL_PANEL = "rgba(255,255,255,0.06)" COL_GRID = "rgba(255,255,255,0.06)" COL_TEXT = "rgba(235,240,255,0.92)" COL_TEXT_DIM = "rgba(235,240,255,0.72)" EMPTY = 0 WALL = 1 DESK = 2 MEETING = 3 SERVER = 4 INCIDENT = 5 TASK_NODE = 6 TILE_COL = { EMPTY: "#162044", WALL: "#cdd2e6", DESK: "#7ad9ff", MEETING: "#ffd17a", SERVER: "#ff7ad9", INCIDENT: "#ff3b3b", TASK_NODE: "#7affc8", } AGENT_COLORS = [ "#7ad9ff", "#ff7ad9", "#ffd17a", "#7affc8", "#ff9b6b", "#c7d2fe", "#a0ffd9", "#ffb0b0", ] DEFAULT_MODEL_PRICING = { "Simulated-Local": {"in": 0.00, "out": 0.00}, "gpt-4o-mini": {"in": 0.15, "out": 0.60}, "gpt-4o": {"in": 5.00, "out": 15.00}, "gpt-5": {"in": 5.00, "out": 15.00}, } DEFAULT_OAI_BASE = "https://api.openai.com/v1" def clamp(v, lo, hi): return lo if v < lo else hi if v > hi else v def make_rng(seed: int) -> random.Random: r = random.Random() r.seed(seed & 0xFFFFFFFF) return r def now_iso(): return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) def est_tokens(text: str) -> int: if not text: return 0 return max(1, int(len(text) / 4)) def safe_json(s: str, fallback=None): try: return json.loads(s) except Exception: return fallback def to_csv_download(df: pd.DataFrame) -> str: tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") df.to_csv(tmp.name, index=False) return tmp.name @dataclass class Task: id: str title: str description: str priority: int = 3 difficulty: int = 3 est_hours: float = 8.0 created_step: int = 0 status: str = "backlog" assigned_to: Optional[str] = None progress: float = 0.0 blockers: List[str] = field(default_factory=list) @dataclass class Agent: name: str model: str key_group: str x: int y: int energy: float = 100.0 role: str = "Generalist" state: str = "idle" current_task_id: Optional[str] = None thoughts: str = "" last_action: str = "" tokens_in: int = 0 tokens_out: int = 0 cost_usd: float = 0.0 compute_s: float = 0.0 @dataclass class World: seed: int = 1337 step: int = 0 sim_time_hours: float = 0.0 tick_hours: float = 4.0 difficulty: int = 3 incident_rate: float = 0.07 grid: List[List[int]] = field(default_factory=list) agents: Dict[str, Agent] = field(default_factory=dict) tasks: Dict[str, Task] = field(default_factory=dict) events: List[str] = field(default_factory=list) runlog: List[Dict[str, Any]] = field(default_factory=list) incidents_open: int = 0 incidents_resolved: int = 0 tasks_done: int = 0 done: bool = False def build_office(seed: int) -> List[List[int]]: r = make_rng(seed) g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)] for x in range(GRID_W): g[0][x] = WALL g[GRID_H - 1][x] = WALL for y in range(GRID_H): g[y][0] = WALL g[y][GRID_W - 1] = WALL def rect(x0, y0, x1, y1, tile): for y in range(y0, y1 + 1): for x in range(x0, x1 + 1): if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1: g[y][x] = tile rect(2, 2, GRID_W - 3, GRID_H - 3, EMPTY) rect(3, 3, 10, 7, MEETING) rect(GRID_W - 11, 3, GRID_W - 4, 7, MEETING) rect(GRID_W - 10, GRID_H - 8, GRID_W - 4, GRID_H - 3, SERVER) for y in range(9, GRID_H - 10): for x in range(4, GRID_W - 12): if (x % 3 == 1) and (y % 2 == 0): g[y][x] = DESK nodes = [(6, GRID_H - 5), (GRID_W // 2, GRID_H // 2), (GRID_W - 14, 10)] for (x, y) in nodes: if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1: g[y][x] = TASK_NODE for _ in range(22): x = r.randint(3, GRID_W - 4) y = r.randint(8, GRID_H - 9) if g[y][x] == EMPTY: g[y][x] = WALL return g def random_walkable_cell(g: List[List[int]], r: random.Random) -> Tuple[int, int]: opts = [] for y in range(1, GRID_H - 1): for x in range(1, GRID_W - 1): if g[y][x] in (EMPTY, DESK, MEETING, SERVER, TASK_NODE): opts.append((x, y)) return r.choice(opts) if opts else (2, 2) def init_world(seed: int) -> World: seed = int(seed) g = build_office(seed) w = World(seed=seed, grid=g) w.events.append(f"[{now_iso()}] Initialized office world | seed={seed}") return w def add_agent(w: World, name: str, model: str, key_group: str, role: str, seed_bump: int = 0): r = make_rng(w.seed + w.step * 17 + seed_bump) x, y = random_walkable_cell(w.grid, r) w.agents[name] = Agent(name=name, model=model, key_group=key_group, x=x, y=y, role=role) w.events.append(f"[t={w.step}] Agent added: {name} | model={model} | key_group={key_group} | role={role}") def add_task(w: World, title: str, description: str, priority: int, difficulty: int, est_hours: float): tid = f"T{len(w.tasks)+1:04d}" w.tasks[tid] = Task( id=tid, title=title.strip()[:80] if title else "Untitled Task", description=description.strip()[:400] if description else "", priority=int(clamp(priority, 1, 5)), difficulty=int(clamp(difficulty, 1, 5)), est_hours=float(max(0.25, est_hours)), created_step=w.step, ) w.events.append(f"[t={w.step}] Task added: {tid} | p={priority} d={difficulty} est={est_hours}h | {w.tasks[tid].title}") return tid DIRS4 = [(1,0), (0,1), (-1,0), (0,-1)] def in_bounds(x, y): return 0 <= x < GRID_W and 0 <= y < GRID_H def is_blocking(tile: int) -> bool: return tile == WALL def bfs_next_step(grid: List[List[int]], start: Tuple[int,int], goal: Tuple[int,int]) -> Optional[Tuple[int,int]]: if start == goal: return None sx, sy = start gx, gy = goal q = [(sx, sy)] prev = {(sx, sy): None} while q: x, y = q.pop(0) if (x, y) == (gx, gy): break for dx, dy in DIRS4: nx, ny = x + dx, y + dy if not in_bounds(nx, ny): continue if is_blocking(grid[ny][nx]): continue if (nx, ny) not in prev: prev[(nx, ny)] = (x, y) q.append((nx, ny)) if (gx, gy) not in prev: return None cur = (gx, gy) while prev[cur] is not None and prev[cur] != start: cur = prev[cur] return cur def oai_chat_completion(base_url: str, api_key: str, model: str, messages: List[Dict[str,str]], timeout_s: int = 25) -> Dict[str, Any]: url = base_url.rstrip("/") + "/chat/completions" payload = json.dumps({ "model": model, "messages": messages, "temperature": 0.4, }).encode("utf-8") req = urllib.request.Request( url, data=payload, headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: data = resp.read().decode("utf-8", errors="replace") return safe_json(data, fallback={"error": {"message": "Invalid JSON from provider"}}) except urllib.error.HTTPError as e: try: body = e.read().decode("utf-8", errors="replace") except Exception: body = "" return {"error": {"message": f"HTTPError {e.code}: {body[:600]}"}} except Exception as e: return {"error": {"message": str(e)}} def price_for(model_pricing: Dict[str, Dict[str,float]], model: str, tokens_in: int, tokens_out: int) -> float: p = model_pricing.get(model) or model_pricing.get("Simulated-Local") or {"in":0.0, "out":0.0} return (tokens_in / 1_000_000.0) * float(p.get("in", 0.0)) + (tokens_out / 1_000_000.0) * float(p.get("out", 0.0)) def choose_task_for_agent(w: World, agent: Agent) -> Optional[str]: backlog = [t for t in w.tasks.values() if t.status in ("backlog", "blocked")] if not backlog: return None backlog.sort(key=lambda t: (-t.priority, t.created_step, t.difficulty)) return backlog[0].id def maybe_generate_incident(w: World, r: random.Random): rate = w.incident_rate * (0.6 + 0.25 * w.difficulty) if r.random() < rate: x, y = random_walkable_cell(w.grid, r) if w.grid[y][x] != WALL: w.grid[y][x] = INCIDENT w.incidents_open += 1 w.events.append(f"[t={w.step}] INCIDENT spawned at ({x},{y})") add_task( w, title="Handle incident", description=f"Investigate and resolve incident at grid ({x},{y}).", priority=5, difficulty=min(5, w.difficulty + 1), est_hours=4.0 + 2.0 * w.difficulty ) def incident_positions(w: World) -> List[Tuple[int,int]]: out = [] for y in range(GRID_H): for x in range(GRID_W): if w.grid[y][x] == INCIDENT: out.append((x, y)) return out def nearest_task_node(w: World, ax: int, ay: int) -> Tuple[int,int]: nodes = [] for y in range(GRID_H): for x in range(GRID_W): if w.grid[y][x] == TASK_NODE: nodes.append((x, y)) if not nodes: return (ax, ay) nodes.sort(key=lambda p: abs(p[0]-ax)+abs(p[1]-ay)) return nodes[0] def simulated_reasoning(agent: Agent, task: Task, w: World) -> Tuple[str, str, int, int, float]: base = 0.08 + 0.04 * w.difficulty + 0.03 * task.difficulty compute_s = clamp(base, 0.05, 0.6) thoughts = ( f"Assess '{task.title}'. Priority={task.priority} difficulty={task.difficulty}. " f"Plan: decompose, execute, verify, document." ) action = f"Progressed {task.id}. Updated notes, checked blockers, validated output." tin = est_tokens(task.title + " " + task.description) + 30 tout = est_tokens(thoughts + " " + action) + 40 return thoughts, action, tin, tout, compute_s def api_reasoning(agent: Agent, task: Task, w: World, base_url: str, api_key: str, model: str, context_prompt: str): t0 = time.time() sys = ( "You are an autonomous business operations agent in a multi-agent simulation. " "Return a JSON object with keys: thoughts, action, blockers (list), progress_delta (0..1). " "Keep thoughts short and action concrete." ) user = { "simulation": {"step": w.step, "sim_time_hours": w.sim_time_hours, "difficulty": w.difficulty, "incidents_open": w.incidents_open}, "agent": {"name": agent.name, "role": agent.role, "energy": agent.energy}, "task": asdict(task), "context": (context_prompt or "")[:1400], } resp = oai_chat_completion(base_url, api_key, model, [{"role":"system","content":sys},{"role":"user","content":json.dumps(user)}]) compute_s = float(time.time() - t0) if "error" in resp: return "", "", 0, 0, compute_s, resp["error"].get("message", "Unknown error") content = "" try: content = resp["choices"][0]["message"]["content"] except Exception: content = "" usage_in = None usage_out = None if isinstance(resp.get("usage"), dict): usage_in = resp["usage"].get("prompt_tokens") usage_out = resp["usage"].get("completion_tokens") obj = safe_json(content, fallback=None) if not isinstance(obj, dict): thoughts = "Provider returned non-JSON; using fallback." action = content[:400] tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user)) tout = usage_out if isinstance(usage_out, int) else est_tokens(content) return thoughts, action, tin, tout, compute_s, None thoughts = str(obj.get("thoughts", ""))[:380] action = str(obj.get("action", ""))[:420] blockers = obj.get("blockers", []) progress_delta = obj.get("progress_delta", 0.0) if isinstance(blockers, list) and blockers: task.blockers = [str(b)[:80] for b in blockers][:5] task.status = "blocked" else: task.blockers = [] if task.status == "blocked": task.status = "in_progress" try: task.progress = clamp(task.progress + float(progress_delta), 0.0, 1.0) except Exception: pass tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user)) tout = usage_out if isinstance(usage_out, int) else est_tokens(json.dumps(obj)) return thoughts, action, tin, tout, compute_s, None def step_agent(w: World, agent: Agent, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str): if agent.energy <= 0: agent.state = "blocked" agent.last_action = "Out of energy" return if agent.current_task_id is None or agent.current_task_id not in w.tasks: tid = choose_task_for_agent(w, agent) if tid is None: agent.state = "idle" agent.thoughts = "No tasks available." agent.last_action = "Standing by." return w.tasks[tid].assigned_to = agent.name w.tasks[tid].status = "in_progress" agent.current_task_id = tid w.events.append(f"[t={w.step}] {agent.name} picked task {tid}") task = w.tasks[agent.current_task_id] incs = incident_positions(w) if incs and task.priority >= 5: incs.sort(key=lambda p: abs(p[0]-agent.x)+abs(p[1]-agent.y)) target = incs[0] else: target = nearest_task_node(w, agent.x, agent.y) nxt = bfs_next_step(w.grid, (agent.x, agent.y), target) if nxt is not None and r.random() < 0.65: agent.x, agent.y = nxt agent.state = "moving" agent.last_action = f"Moved toward {target}" agent.energy = max(0.0, agent.energy - 0.8) return agent.state = "working" if agent.model == "Simulated-Local" or agent.key_group == "none": thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w) err = None speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty) task.progress = clamp(task.progress + speed, 0.0, 1.0) if task.progress < 1.0: task.status = "in_progress" else: key = (keyrings or {}).get(agent.key_group, "") if not key: thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w) err = f"No key for group '{agent.key_group}', used local simulation." speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty) task.progress = clamp(task.progress + speed, 0.0, 1.0) else: thoughts, action, tin, tout, compute_s, err = api_reasoning(agent, task, w, base_url, key, agent.model, context_prompt) if task.progress >= 1.0 and task.status != "done": task.status = "done" w.tasks_done += 1 w.events.append(f"[t={w.step}] DONE {task.id}: {task.title}") agent.current_task_id = None if "incident" in task.title.lower(): incs = incident_positions(w) if incs: x, y = incs[0] w.grid[y][x] = EMPTY w.incidents_open = max(0, w.incidents_open - 1) w.incidents_resolved += 1 w.events.append(f"[t={w.step}] Incident resolved at ({x},{y})") agent.thoughts = thoughts agent.last_action = action if action else agent.last_action agent.tokens_in += int(tin) agent.tokens_out += int(tout) agent.compute_s += float(compute_s) agent.cost_usd += price_for(model_pricing, agent.model, int(tin), int(tout)) agent.energy = max(0.0, agent.energy - (0.8 + 0.15 * w.difficulty + 0.12 * task.difficulty)) w.runlog.append({ "step": w.step, "sim_time_hours": round(w.sim_time_hours, 2), "agent": agent.name, "role": agent.role, "model": agent.model, "key_group": agent.key_group, "task_id": task.id, "task_status": task.status, "task_progress": round(task.progress, 3), "action": (agent.last_action or "")[:220], "thoughts": (agent.thoughts or "")[:220], "tokens_in": int(tin), "tokens_out": int(tout), "cost_usd": round(price_for(model_pricing, agent.model, int(tin), int(tout)), 6), "compute_s": round(float(compute_s), 3), "error": (err or "")[:220], }) def tick(w: World, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str, max_log: int = 4000): if w.done: return maybe_generate_incident(w, r) agents = list(w.agents.values()) agents.sort(key=lambda a: (a.energy, a.name)) for ag in agents: step_agent(w, ag, r, model_pricing, keyrings, base_url, context_prompt) w.step += 1 w.sim_time_hours += float(w.tick_hours) if len(w.events) > 250: w.events = w.events[-250:] if len(w.runlog) > max_log: w.runlog = w.runlog[-max_log:] def compute_kpis(w: World) -> Dict[str, Any]: backlog = sum(1 for t in w.tasks.values() if t.status == "backlog") inprog = sum(1 for t in w.tasks.values() if t.status == "in_progress") blocked = sum(1 for t in w.tasks.values() if t.status == "blocked") done = sum(1 for t in w.tasks.values() if t.status == "done") total_cost = sum(a.cost_usd for a in w.agents.values()) total_tokens_in = sum(a.tokens_in for a in w.agents.values()) total_tokens_out = sum(a.tokens_out for a in w.agents.values()) total_compute = sum(a.compute_s for a in w.agents.values()) days = max(1e-6, w.sim_time_hours / 24.0) tpd = done / days return { "sim_time_days": round(w.sim_time_hours / 24.0, 2), "agents": len(w.agents), "tasks_total": len(w.tasks), "tasks_backlog": backlog, "tasks_in_progress": inprog, "tasks_blocked": blocked, "tasks_done": done, "incidents_open": w.incidents_open, "incidents_resolved": w.incidents_resolved, "throughput_tasks_per_day": round(tpd, 3), "tokens_in_total": int(total_tokens_in), "tokens_out_total": int(total_tokens_out), "compute_s_total": round(total_compute, 2), "cost_usd_total": round(total_cost, 4), } def svg_render(w: World) -> str: k = compute_kpis(w) headline = ( f"ZEN Orchestrator Sandbox • step={w.step} • sim_days={k['sim_time_days']} • " f"agents={k['agents']} • done={k['tasks_done']} • backlog={k['tasks_backlog']} • " f"incidents_open={k['incidents_open']} • cost=${k['cost_usd_total']}" ) detail = f"tick_hours={w.tick_hours} • difficulty={w.difficulty} • incident_rate={round(w.incident_rate,3)}" css = f""" """ parts = [f"""
{css} {headline} {detail} """] for y in range(GRID_H): for x in range(GRID_W): t = w.grid[y][x] col = TILE_COL.get(t, TILE_COL[EMPTY]) px = x * TILE py = HUD_H + y * TILE parts.append(f'') if t == INCIDENT: parts.append(f'') parts.append(f'') for x in range(GRID_W + 1): px = x * TILE parts.append(f'') for y in range(GRID_H + 1): py = HUD_H + y * TILE parts.append(f'') for i, ag in enumerate(w.agents.values()): col = AGENT_COLORS[i % len(AGENT_COLORS)] px = ag.x * TILE py = HUD_H + ag.y * TILE parts.append(f""" {ag.name} """) bar_w = TILE * 0.80 bx = TILE/2 - bar_w/2 by = TILE * 0.82 parts.append(f'') parts.append(f'') parts.append("") parts.append("
") return "".join(parts) def agents_text(w: World) -> str: lines = [] for ag in w.agents.values(): lines.append( f"{ag.name} | role={ag.role} | model={ag.model} | key_group={ag.key_group} | " f"state={ag.state} | energy={round(ag.energy,1)} | " f"task={ag.current_task_id or '-'} | cost=${round(ag.cost_usd,4)} | " f"tin={ag.tokens_in} tout={ag.tokens_out}" ) return "\n".join(lines) if lines else "(no agents yet)" def tasks_text(w: World) -> str: tasks = list(w.tasks.values()) tasks.sort(key=lambda t: (t.status != "done", -t.priority, t.created_step)) out = [] for t in tasks[:18]: bl = f" blockers={t.blockers}" if t.blockers else "" out.append( f"{t.id} [{t.status}] p={t.priority} d={t.difficulty} prog={round(t.progress,2)} " f"est={t.est_hours}h assigned={t.assigned_to or '-'} :: {t.title}{bl}" ) return "\n".join(out) if out else "(no tasks yet)" def events_text(w: World) -> str: return "\n".join(w.events[-20:]) if w.events else "" def kpis_text(w: World) -> str: return json.dumps(compute_kpis(w), indent=2) def run_data_df(w: World, rows: int) -> pd.DataFrame: rows = int(max(10, rows)) if not w.runlog: return pd.DataFrame(columns=[ "step","sim_time_hours","agent","role","model","key_group","task_id","task_status","task_progress", "action","thoughts","tokens_in","tokens_out","cost_usd","compute_s","error" ]) return pd.DataFrame(w.runlog[-rows:]) def ui_refresh(w: World, run_rows: int): return ( svg_render(w), agents_text(w), tasks_text(w), events_text(w), kpis_text(w), run_data_df(w, run_rows), ) TITLE = "ZEN Orchestrator Sandbox — Business-grade Agent Orchestra Simulator" with gr.Blocks(title=TITLE) as demo: gr.Markdown( f"## {TITLE}\n" "Business-oriented multi-agent simulation with game-like visuals, time controls, run logging, and optional model keys." ) # CSS scroll wrapper for the dataframe (Gradio 5.49.1-safe) gr.HTML(""" """) w_state = gr.State(init_world(1337)) autoplay_on = gr.State(False) keyrings_state = gr.State({"none": ""}) pricing_state = gr.State(DEFAULT_MODEL_PRICING) with gr.Row(): arena = gr.HTML(label="Office World (Animated SVG)") with gr.Column(scale=1): agents_box = gr.Textbox(label="Agents", lines=10) tasks_box = gr.Textbox(label="Task Backlog", lines=10) with gr.Row(): events_box = gr.Textbox(label="Event Log", lines=8) kpi_box = gr.Textbox(label="KPIs (JSON)", lines=8) gr.Markdown("### Run Data (scroll + download)") with gr.Row(): runlog_rows = gr.Slider(50, 1500, value=250, step=50, label="Run Data rows to display") download_btn = gr.Button("Download Run Data CSV") # Scroll wrapper around dataframe gr.HTML('
') run_data = gr.Dataframe(label="Run Data", interactive=False, wrap=True) # ✅ no height kwarg gr.HTML('
') download_file = gr.File(label="CSV Download", interactive=False) gr.Markdown("### Scenario + Orchestration Controls") with gr.Row(): seed_in = gr.Number(value=1337, precision=0, label="Seed") tick_hours = gr.Slider(0.5, 168.0, value=4.0, step=0.5, label="Simulated hours per tick") difficulty = gr.Slider(1, 5, value=3, step=1, label="Global difficulty") incident_rate = gr.Slider(0.0, 0.35, value=0.07, step=0.01, label="Incident rate per tick") with gr.Row(): btn_reset = gr.Button("Reset World") run_n = gr.Number(value=10, precision=0, label="Run N ticks") btn_run = gr.Button("Run") gr.Markdown("### Add Tasks") with gr.Row(): task_title = gr.Textbox(label="Task title", value="Build onboarding automation") task_desc = gr.Textbox(label="Task description", value="Design an end-to-end flow: intake, validation, audit log, export.") with gr.Row(): task_p = gr.Slider(1, 5, value=4, step=1, label="Priority") task_d = gr.Slider(1, 5, value=3, step=1, label="Difficulty") task_est = gr.Slider(0.25, 200.0, value=16.0, step=0.25, label="Estimated hours") btn_add_task = gr.Button("Add Task") gr.Markdown("### Add Agents") with gr.Row(): agent_name = gr.Textbox(label="Agent name", value="Agent-01") agent_role = gr.Dropdown( choices=["Generalist", "Ops", "HR Automation", "Engineer", "Analyst", "Incident Response", "PM"], value="Engineer", label="Role", ) with gr.Row(): model_choice = gr.Dropdown( choices=["Simulated-Local", "gpt-4o-mini", "gpt-4o", "gpt-5"], value="Simulated-Local", label="Model", ) key_group = gr.Dropdown( choices=["none", "key1", "key2", "key3"], value="none", label="Key group", ) btn_add_agent = gr.Button("Add Agent") gr.Markdown("### Model Keys + Pricing (Optional)") with gr.Row(): oai_base = gr.Textbox(label="OpenAI-compatible base URL", value=DEFAULT_OAI_BASE) context_prompt = gr.Textbox(label="Global Context Prompt", value="Simulate a business ops team with auditability and cost tracking.", lines=3) with gr.Row(): key1 = gr.Textbox(label="API Key (key1)", type="password") key2 = gr.Textbox(label="API Key (key2)", type="password") key3 = gr.Textbox(label="API Key (key3)", type="password") pricing_json = gr.Textbox(label="Model pricing JSON (USD per 1M tokens)", value=json.dumps(DEFAULT_MODEL_PRICING, indent=2), lines=8) btn_apply_keys = gr.Button("Apply Keys + Pricing") gr.Markdown("### Autoplay") autoplay_speed = gr.Slider(0.05, 1.2, value=0.18, step=0.01, label="Autoplay interval (seconds)") with gr.Row(): btn_play = gr.Button("▶ Start Autoplay") btn_pause = gr.Button("⏸ Stop Autoplay") timer = gr.Timer(value=0.18, active=False) def do_reset(seed: int, rows: int): w = init_world(int(seed)) return (*ui_refresh(w, rows), w) btn_reset.click( do_reset, inputs=[seed_in, runlog_rows], outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], queue=True, ) def add_task_clicked(w: World, rows: int, title: str, desc: str, p: int, d: int, est: float): add_task(w, title, desc, p, d, est) return (*ui_refresh(w, rows), w) btn_add_task.click( add_task_clicked, inputs=[w_state, runlog_rows, task_title, task_desc, task_p, task_d, task_est], outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], queue=True, ) def add_agent_clicked(w: World, rows: int, name: str, role: str, model: str, kg: str): name = (name or "").strip() or f"Agent-{len(w.agents)+1:02d}" if name in w.agents: name = f"{name}-{len(w.agents)+1}" add_agent(w, name=name, model=model, key_group=kg, role=role, seed_bump=len(w.agents) * 19) return (*ui_refresh(w, rows), w) btn_add_agent.click( add_agent_clicked, inputs=[w_state, runlog_rows, agent_name, agent_role, model_choice, key_group], outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], queue=True, ) def apply_keys_pricing(keys_state: Dict[str, str], pricing_state_obj: Dict[str, Any], k1: str, k2: str, k3: str, pricing_txt: str): keys_state = dict(keys_state) if isinstance(keys_state, dict) else {"none": ""} keys_state["none"] = "" if k1: keys_state["key1"] = k1.strip() if k2: keys_state["key2"] = k2.strip() if k3: keys_state["key3"] = k3.strip() pj = safe_json(pricing_txt, fallback=None) if isinstance(pj, dict): pricing_state_obj = pj return keys_state, pricing_state_obj btn_apply_keys.click( apply_keys_pricing, inputs=[keyrings_state, pricing_state, key1, key2, key3, pricing_json], outputs=[keyrings_state, pricing_state], queue=True, ) def run_clicked(w: World, rows: int, n: int, th: float, diff: int, ir: float, keys_state: Dict[str, str], pricing_obj: Dict[str, Any], base_url: str, ctx: str): w.tick_hours = float(th) w.difficulty = int(diff) w.incident_rate = float(ir) w.events.append(f"[t={w.step}] Scenario updated: tick_hours={w.tick_hours}, difficulty={w.difficulty}, incident_rate={w.incident_rate}") n = int(max(1, n)) r = make_rng(w.seed + w.step * 101) for _ in range(n): tick(w, r, pricing_obj, keys_state, base_url, ctx) return (*ui_refresh(w, rows), w) btn_run.click( run_clicked, inputs=[w_state, runlog_rows, run_n, tick_hours, difficulty, incident_rate, keyrings_state, pricing_state, oai_base, context_prompt], outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], queue=True, ) def download_run_data(w: World, rows: int): df = run_data_df(w, rows=50000) path = to_csv_download(df) return path download_btn.click( download_run_data, inputs=[w_state, runlog_rows], outputs=[download_file], queue=True, ) def autoplay_start(interval: float): return gr.update(value=float(interval), active=True), True def autoplay_stop(): return gr.update(active=False), False btn_play.click(autoplay_start, inputs=[autoplay_speed], outputs=[timer, autoplay_on], queue=True) btn_pause.click(autoplay_stop, inputs=[], outputs=[timer, autoplay_on], queue=True) def autoplay_tick(w: World, is_on: bool, rows: int, th: float, diff: int, ir: float, keys_state: Dict[str, str], pricing_obj: Dict[str, Any], base_url: str, ctx: str): if not is_on: return (*ui_refresh(w, rows), w, is_on, gr.update()) w.tick_hours = float(th) w.difficulty = int(diff) w.incident_rate = float(ir) r = make_rng(w.seed + w.step * 101) tick(w, r, pricing_obj, keys_state, base_url, ctx) return (*ui_refresh(w, rows), w, True, gr.update()) timer.tick( autoplay_tick, inputs=[w_state, autoplay_on, runlog_rows, tick_hours, difficulty, incident_rate, keyrings_state, pricing_state, oai_base, context_prompt], outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state, autoplay_on, timer], queue=True, ) # initial render def on_load(w: World, rows: int): return (*ui_refresh(w, rows), w) demo.load( on_load, inputs=[w_state, runlog_rows], outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], queue=True, ) demo.queue().launch(ssr_mode=False)