Spaces:
Sleeping
Sleeping
| import json | |
| import math | |
| import os | |
| import time | |
| import random | |
| import tempfile | |
| import urllib.request | |
| import urllib.error | |
| from dataclasses import dataclass, field, asdict | |
| from typing import Dict, List, Optional, Tuple, Any | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image, ImageDraw | |
| import gradio as gr | |
| # ============================================================ | |
| # ZEN Orchestrator Sandbox — Business-grade Agent Simulation | |
| # ============================================================ | |
| # Fixes in this regen: | |
| # - Removes unsupported gr.Dataframe(height=...) for Gradio 5.49.1 | |
| # - Uses a scroll container via HTML/CSS around the dataframe | |
| # ============================================================ | |
| GRID_W, GRID_H = 32, 20 | |
| TILE = 22 | |
| HUD_H = 70 | |
| SVG_W = GRID_W * TILE | |
| SVG_H = GRID_H * TILE + HUD_H | |
| COL_BG = "#0a1020" | |
| COL_PANEL = "rgba(255,255,255,0.06)" | |
| COL_GRID = "rgba(255,255,255,0.06)" | |
| COL_TEXT = "rgba(235,240,255,0.92)" | |
| COL_TEXT_DIM = "rgba(235,240,255,0.72)" | |
| EMPTY = 0 | |
| WALL = 1 | |
| DESK = 2 | |
| MEETING = 3 | |
| SERVER = 4 | |
| INCIDENT = 5 | |
| TASK_NODE = 6 | |
| TILE_COL = { | |
| EMPTY: "#162044", | |
| WALL: "#cdd2e6", | |
| DESK: "#7ad9ff", | |
| MEETING: "#ffd17a", | |
| SERVER: "#ff7ad9", | |
| INCIDENT: "#ff3b3b", | |
| TASK_NODE: "#7affc8", | |
| } | |
| AGENT_COLORS = [ | |
| "#7ad9ff", "#ff7ad9", "#ffd17a", "#7affc8", | |
| "#ff9b6b", "#c7d2fe", "#a0ffd9", "#ffb0b0", | |
| ] | |
| DEFAULT_MODEL_PRICING = { | |
| "Simulated-Local": {"in": 0.00, "out": 0.00}, | |
| "gpt-4o-mini": {"in": 0.15, "out": 0.60}, | |
| "gpt-4o": {"in": 5.00, "out": 15.00}, | |
| "gpt-5": {"in": 5.00, "out": 15.00}, | |
| } | |
| DEFAULT_OAI_BASE = "https://api.openai.com/v1" | |
| def clamp(v, lo, hi): | |
| return lo if v < lo else hi if v > hi else v | |
| def make_rng(seed: int) -> random.Random: | |
| r = random.Random() | |
| r.seed(seed & 0xFFFFFFFF) | |
| return r | |
| def now_iso(): | |
| return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) | |
| def est_tokens(text: str) -> int: | |
| if not text: | |
| return 0 | |
| return max(1, int(len(text) / 4)) | |
| def safe_json(s: str, fallback=None): | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| return fallback | |
| def to_csv_download(df: pd.DataFrame) -> str: | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") | |
| df.to_csv(tmp.name, index=False) | |
| return tmp.name | |
| class Task: | |
| id: str | |
| title: str | |
| description: str | |
| priority: int = 3 | |
| difficulty: int = 3 | |
| est_hours: float = 8.0 | |
| created_step: int = 0 | |
| status: str = "backlog" | |
| assigned_to: Optional[str] = None | |
| progress: float = 0.0 | |
| blockers: List[str] = field(default_factory=list) | |
| class Agent: | |
| name: str | |
| model: str | |
| key_group: str | |
| x: int | |
| y: int | |
| energy: float = 100.0 | |
| role: str = "Generalist" | |
| state: str = "idle" | |
| current_task_id: Optional[str] = None | |
| thoughts: str = "" | |
| last_action: str = "" | |
| tokens_in: int = 0 | |
| tokens_out: int = 0 | |
| cost_usd: float = 0.0 | |
| compute_s: float = 0.0 | |
| class World: | |
| seed: int = 1337 | |
| step: int = 0 | |
| sim_time_hours: float = 0.0 | |
| tick_hours: float = 4.0 | |
| difficulty: int = 3 | |
| incident_rate: float = 0.07 | |
| grid: List[List[int]] = field(default_factory=list) | |
| agents: Dict[str, Agent] = field(default_factory=dict) | |
| tasks: Dict[str, Task] = field(default_factory=dict) | |
| events: List[str] = field(default_factory=list) | |
| runlog: List[Dict[str, Any]] = field(default_factory=list) | |
| incidents_open: int = 0 | |
| incidents_resolved: int = 0 | |
| tasks_done: int = 0 | |
| done: bool = False | |
| def build_office(seed: int) -> List[List[int]]: | |
| r = make_rng(seed) | |
| g = [[EMPTY for _ in range(GRID_W)] for _ in range(GRID_H)] | |
| for x in range(GRID_W): | |
| g[0][x] = WALL | |
| g[GRID_H - 1][x] = WALL | |
| for y in range(GRID_H): | |
| g[y][0] = WALL | |
| g[y][GRID_W - 1] = WALL | |
| def rect(x0, y0, x1, y1, tile): | |
| for y in range(y0, y1 + 1): | |
| for x in range(x0, x1 + 1): | |
| if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1: | |
| g[y][x] = tile | |
| rect(2, 2, GRID_W - 3, GRID_H - 3, EMPTY) | |
| rect(3, 3, 10, 7, MEETING) | |
| rect(GRID_W - 11, 3, GRID_W - 4, 7, MEETING) | |
| rect(GRID_W - 10, GRID_H - 8, GRID_W - 4, GRID_H - 3, SERVER) | |
| for y in range(9, GRID_H - 10): | |
| for x in range(4, GRID_W - 12): | |
| if (x % 3 == 1) and (y % 2 == 0): | |
| g[y][x] = DESK | |
| nodes = [(6, GRID_H - 5), (GRID_W // 2, GRID_H // 2), (GRID_W - 14, 10)] | |
| for (x, y) in nodes: | |
| if 1 <= x < GRID_W - 1 and 1 <= y < GRID_H - 1: | |
| g[y][x] = TASK_NODE | |
| for _ in range(22): | |
| x = r.randint(3, GRID_W - 4) | |
| y = r.randint(8, GRID_H - 9) | |
| if g[y][x] == EMPTY: | |
| g[y][x] = WALL | |
| return g | |
| def random_walkable_cell(g: List[List[int]], r: random.Random) -> Tuple[int, int]: | |
| opts = [] | |
| for y in range(1, GRID_H - 1): | |
| for x in range(1, GRID_W - 1): | |
| if g[y][x] in (EMPTY, DESK, MEETING, SERVER, TASK_NODE): | |
| opts.append((x, y)) | |
| return r.choice(opts) if opts else (2, 2) | |
| def init_world(seed: int) -> World: | |
| seed = int(seed) | |
| g = build_office(seed) | |
| w = World(seed=seed, grid=g) | |
| w.events.append(f"[{now_iso()}] Initialized office world | seed={seed}") | |
| return w | |
| def add_agent(w: World, name: str, model: str, key_group: str, role: str, seed_bump: int = 0): | |
| r = make_rng(w.seed + w.step * 17 + seed_bump) | |
| x, y = random_walkable_cell(w.grid, r) | |
| w.agents[name] = Agent(name=name, model=model, key_group=key_group, x=x, y=y, role=role) | |
| w.events.append(f"[t={w.step}] Agent added: {name} | model={model} | key_group={key_group} | role={role}") | |
| def add_task(w: World, title: str, description: str, priority: int, difficulty: int, est_hours: float): | |
| tid = f"T{len(w.tasks)+1:04d}" | |
| w.tasks[tid] = Task( | |
| id=tid, | |
| title=title.strip()[:80] if title else "Untitled Task", | |
| description=description.strip()[:400] if description else "", | |
| priority=int(clamp(priority, 1, 5)), | |
| difficulty=int(clamp(difficulty, 1, 5)), | |
| est_hours=float(max(0.25, est_hours)), | |
| created_step=w.step, | |
| ) | |
| w.events.append(f"[t={w.step}] Task added: {tid} | p={priority} d={difficulty} est={est_hours}h | {w.tasks[tid].title}") | |
| return tid | |
| DIRS4 = [(1,0), (0,1), (-1,0), (0,-1)] | |
| def in_bounds(x, y): | |
| return 0 <= x < GRID_W and 0 <= y < GRID_H | |
| def is_blocking(tile: int) -> bool: | |
| return tile == WALL | |
| def bfs_next_step(grid: List[List[int]], start: Tuple[int,int], goal: Tuple[int,int]) -> Optional[Tuple[int,int]]: | |
| if start == goal: | |
| return None | |
| sx, sy = start | |
| gx, gy = goal | |
| q = [(sx, sy)] | |
| prev = {(sx, sy): None} | |
| while q: | |
| x, y = q.pop(0) | |
| if (x, y) == (gx, gy): | |
| break | |
| for dx, dy in DIRS4: | |
| nx, ny = x + dx, y + dy | |
| if not in_bounds(nx, ny): | |
| continue | |
| if is_blocking(grid[ny][nx]): | |
| continue | |
| if (nx, ny) not in prev: | |
| prev[(nx, ny)] = (x, y) | |
| q.append((nx, ny)) | |
| if (gx, gy) not in prev: | |
| return None | |
| cur = (gx, gy) | |
| while prev[cur] is not None and prev[cur] != start: | |
| cur = prev[cur] | |
| return cur | |
| def oai_chat_completion(base_url: str, api_key: str, model: str, messages: List[Dict[str,str]], timeout_s: int = 25) -> Dict[str, Any]: | |
| url = base_url.rstrip("/") + "/chat/completions" | |
| payload = json.dumps({ | |
| "model": model, | |
| "messages": messages, | |
| "temperature": 0.4, | |
| }).encode("utf-8") | |
| req = urllib.request.Request( | |
| url, | |
| data=payload, | |
| headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}, | |
| method="POST", | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=timeout_s) as resp: | |
| data = resp.read().decode("utf-8", errors="replace") | |
| return safe_json(data, fallback={"error": {"message": "Invalid JSON from provider"}}) | |
| except urllib.error.HTTPError as e: | |
| try: | |
| body = e.read().decode("utf-8", errors="replace") | |
| except Exception: | |
| body = "" | |
| return {"error": {"message": f"HTTPError {e.code}: {body[:600]}"}} | |
| except Exception as e: | |
| return {"error": {"message": str(e)}} | |
| def price_for(model_pricing: Dict[str, Dict[str,float]], model: str, tokens_in: int, tokens_out: int) -> float: | |
| p = model_pricing.get(model) or model_pricing.get("Simulated-Local") or {"in":0.0, "out":0.0} | |
| return (tokens_in / 1_000_000.0) * float(p.get("in", 0.0)) + (tokens_out / 1_000_000.0) * float(p.get("out", 0.0)) | |
| def choose_task_for_agent(w: World, agent: Agent) -> Optional[str]: | |
| backlog = [t for t in w.tasks.values() if t.status in ("backlog", "blocked")] | |
| if not backlog: | |
| return None | |
| backlog.sort(key=lambda t: (-t.priority, t.created_step, t.difficulty)) | |
| return backlog[0].id | |
| def maybe_generate_incident(w: World, r: random.Random): | |
| rate = w.incident_rate * (0.6 + 0.25 * w.difficulty) | |
| if r.random() < rate: | |
| x, y = random_walkable_cell(w.grid, r) | |
| if w.grid[y][x] != WALL: | |
| w.grid[y][x] = INCIDENT | |
| w.incidents_open += 1 | |
| w.events.append(f"[t={w.step}] INCIDENT spawned at ({x},{y})") | |
| add_task( | |
| w, | |
| title="Handle incident", | |
| description=f"Investigate and resolve incident at grid ({x},{y}).", | |
| priority=5, | |
| difficulty=min(5, w.difficulty + 1), | |
| est_hours=4.0 + 2.0 * w.difficulty | |
| ) | |
| def incident_positions(w: World) -> List[Tuple[int,int]]: | |
| out = [] | |
| for y in range(GRID_H): | |
| for x in range(GRID_W): | |
| if w.grid[y][x] == INCIDENT: | |
| out.append((x, y)) | |
| return out | |
| def nearest_task_node(w: World, ax: int, ay: int) -> Tuple[int,int]: | |
| nodes = [] | |
| for y in range(GRID_H): | |
| for x in range(GRID_W): | |
| if w.grid[y][x] == TASK_NODE: | |
| nodes.append((x, y)) | |
| if not nodes: | |
| return (ax, ay) | |
| nodes.sort(key=lambda p: abs(p[0]-ax)+abs(p[1]-ay)) | |
| return nodes[0] | |
| def simulated_reasoning(agent: Agent, task: Task, w: World) -> Tuple[str, str, int, int, float]: | |
| base = 0.08 + 0.04 * w.difficulty + 0.03 * task.difficulty | |
| compute_s = clamp(base, 0.05, 0.6) | |
| thoughts = ( | |
| f"Assess '{task.title}'. Priority={task.priority} difficulty={task.difficulty}. " | |
| f"Plan: decompose, execute, verify, document." | |
| ) | |
| action = f"Progressed {task.id}. Updated notes, checked blockers, validated output." | |
| tin = est_tokens(task.title + " " + task.description) + 30 | |
| tout = est_tokens(thoughts + " " + action) + 40 | |
| return thoughts, action, tin, tout, compute_s | |
| def api_reasoning(agent: Agent, task: Task, w: World, base_url: str, api_key: str, model: str, context_prompt: str): | |
| t0 = time.time() | |
| sys = ( | |
| "You are an autonomous business operations agent in a multi-agent simulation. " | |
| "Return a JSON object with keys: thoughts, action, blockers (list), progress_delta (0..1). " | |
| "Keep thoughts short and action concrete." | |
| ) | |
| user = { | |
| "simulation": {"step": w.step, "sim_time_hours": w.sim_time_hours, "difficulty": w.difficulty, "incidents_open": w.incidents_open}, | |
| "agent": {"name": agent.name, "role": agent.role, "energy": agent.energy}, | |
| "task": asdict(task), | |
| "context": (context_prompt or "")[:1400], | |
| } | |
| resp = oai_chat_completion(base_url, api_key, model, [{"role":"system","content":sys},{"role":"user","content":json.dumps(user)}]) | |
| compute_s = float(time.time() - t0) | |
| if "error" in resp: | |
| return "", "", 0, 0, compute_s, resp["error"].get("message", "Unknown error") | |
| content = "" | |
| try: | |
| content = resp["choices"][0]["message"]["content"] | |
| except Exception: | |
| content = "" | |
| usage_in = None | |
| usage_out = None | |
| if isinstance(resp.get("usage"), dict): | |
| usage_in = resp["usage"].get("prompt_tokens") | |
| usage_out = resp["usage"].get("completion_tokens") | |
| obj = safe_json(content, fallback=None) | |
| if not isinstance(obj, dict): | |
| thoughts = "Provider returned non-JSON; using fallback." | |
| action = content[:400] | |
| tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user)) | |
| tout = usage_out if isinstance(usage_out, int) else est_tokens(content) | |
| return thoughts, action, tin, tout, compute_s, None | |
| thoughts = str(obj.get("thoughts", ""))[:380] | |
| action = str(obj.get("action", ""))[:420] | |
| blockers = obj.get("blockers", []) | |
| progress_delta = obj.get("progress_delta", 0.0) | |
| if isinstance(blockers, list) and blockers: | |
| task.blockers = [str(b)[:80] for b in blockers][:5] | |
| task.status = "blocked" | |
| else: | |
| task.blockers = [] | |
| if task.status == "blocked": | |
| task.status = "in_progress" | |
| try: | |
| task.progress = clamp(task.progress + float(progress_delta), 0.0, 1.0) | |
| except Exception: | |
| pass | |
| tin = usage_in if isinstance(usage_in, int) else est_tokens(sys + json.dumps(user)) | |
| tout = usage_out if isinstance(usage_out, int) else est_tokens(json.dumps(obj)) | |
| return thoughts, action, tin, tout, compute_s, None | |
| def step_agent(w: World, agent: Agent, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str): | |
| if agent.energy <= 0: | |
| agent.state = "blocked" | |
| agent.last_action = "Out of energy" | |
| return | |
| if agent.current_task_id is None or agent.current_task_id not in w.tasks: | |
| tid = choose_task_for_agent(w, agent) | |
| if tid is None: | |
| agent.state = "idle" | |
| agent.thoughts = "No tasks available." | |
| agent.last_action = "Standing by." | |
| return | |
| w.tasks[tid].assigned_to = agent.name | |
| w.tasks[tid].status = "in_progress" | |
| agent.current_task_id = tid | |
| w.events.append(f"[t={w.step}] {agent.name} picked task {tid}") | |
| task = w.tasks[agent.current_task_id] | |
| incs = incident_positions(w) | |
| if incs and task.priority >= 5: | |
| incs.sort(key=lambda p: abs(p[0]-agent.x)+abs(p[1]-agent.y)) | |
| target = incs[0] | |
| else: | |
| target = nearest_task_node(w, agent.x, agent.y) | |
| nxt = bfs_next_step(w.grid, (agent.x, agent.y), target) | |
| if nxt is not None and r.random() < 0.65: | |
| agent.x, agent.y = nxt | |
| agent.state = "moving" | |
| agent.last_action = f"Moved toward {target}" | |
| agent.energy = max(0.0, agent.energy - 0.8) | |
| return | |
| agent.state = "working" | |
| if agent.model == "Simulated-Local" or agent.key_group == "none": | |
| thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w) | |
| err = None | |
| speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty) | |
| task.progress = clamp(task.progress + speed, 0.0, 1.0) | |
| if task.progress < 1.0: | |
| task.status = "in_progress" | |
| else: | |
| key = (keyrings or {}).get(agent.key_group, "") | |
| if not key: | |
| thoughts, action, tin, tout, compute_s = simulated_reasoning(agent, task, w) | |
| err = f"No key for group '{agent.key_group}', used local simulation." | |
| speed = 0.08 / (0.6 + 0.25 * w.difficulty + 0.30 * task.difficulty) | |
| task.progress = clamp(task.progress + speed, 0.0, 1.0) | |
| else: | |
| thoughts, action, tin, tout, compute_s, err = api_reasoning(agent, task, w, base_url, key, agent.model, context_prompt) | |
| if task.progress >= 1.0 and task.status != "done": | |
| task.status = "done" | |
| w.tasks_done += 1 | |
| w.events.append(f"[t={w.step}] DONE {task.id}: {task.title}") | |
| agent.current_task_id = None | |
| if "incident" in task.title.lower(): | |
| incs = incident_positions(w) | |
| if incs: | |
| x, y = incs[0] | |
| w.grid[y][x] = EMPTY | |
| w.incidents_open = max(0, w.incidents_open - 1) | |
| w.incidents_resolved += 1 | |
| w.events.append(f"[t={w.step}] Incident resolved at ({x},{y})") | |
| agent.thoughts = thoughts | |
| agent.last_action = action if action else agent.last_action | |
| agent.tokens_in += int(tin) | |
| agent.tokens_out += int(tout) | |
| agent.compute_s += float(compute_s) | |
| agent.cost_usd += price_for(model_pricing, agent.model, int(tin), int(tout)) | |
| agent.energy = max(0.0, agent.energy - (0.8 + 0.15 * w.difficulty + 0.12 * task.difficulty)) | |
| w.runlog.append({ | |
| "step": w.step, | |
| "sim_time_hours": round(w.sim_time_hours, 2), | |
| "agent": agent.name, | |
| "role": agent.role, | |
| "model": agent.model, | |
| "key_group": agent.key_group, | |
| "task_id": task.id, | |
| "task_status": task.status, | |
| "task_progress": round(task.progress, 3), | |
| "action": (agent.last_action or "")[:220], | |
| "thoughts": (agent.thoughts or "")[:220], | |
| "tokens_in": int(tin), | |
| "tokens_out": int(tout), | |
| "cost_usd": round(price_for(model_pricing, agent.model, int(tin), int(tout)), 6), | |
| "compute_s": round(float(compute_s), 3), | |
| "error": (err or "")[:220], | |
| }) | |
| def tick(w: World, r: random.Random, model_pricing: Dict[str, Dict[str, float]], keyrings: Dict[str, str], base_url: str, context_prompt: str, max_log: int = 4000): | |
| if w.done: | |
| return | |
| maybe_generate_incident(w, r) | |
| agents = list(w.agents.values()) | |
| agents.sort(key=lambda a: (a.energy, a.name)) | |
| for ag in agents: | |
| step_agent(w, ag, r, model_pricing, keyrings, base_url, context_prompt) | |
| w.step += 1 | |
| w.sim_time_hours += float(w.tick_hours) | |
| if len(w.events) > 250: | |
| w.events = w.events[-250:] | |
| if len(w.runlog) > max_log: | |
| w.runlog = w.runlog[-max_log:] | |
| def compute_kpis(w: World) -> Dict[str, Any]: | |
| backlog = sum(1 for t in w.tasks.values() if t.status == "backlog") | |
| inprog = sum(1 for t in w.tasks.values() if t.status == "in_progress") | |
| blocked = sum(1 for t in w.tasks.values() if t.status == "blocked") | |
| done = sum(1 for t in w.tasks.values() if t.status == "done") | |
| total_cost = sum(a.cost_usd for a in w.agents.values()) | |
| total_tokens_in = sum(a.tokens_in for a in w.agents.values()) | |
| total_tokens_out = sum(a.tokens_out for a in w.agents.values()) | |
| total_compute = sum(a.compute_s for a in w.agents.values()) | |
| days = max(1e-6, w.sim_time_hours / 24.0) | |
| tpd = done / days | |
| return { | |
| "sim_time_days": round(w.sim_time_hours / 24.0, 2), | |
| "agents": len(w.agents), | |
| "tasks_total": len(w.tasks), | |
| "tasks_backlog": backlog, | |
| "tasks_in_progress": inprog, | |
| "tasks_blocked": blocked, | |
| "tasks_done": done, | |
| "incidents_open": w.incidents_open, | |
| "incidents_resolved": w.incidents_resolved, | |
| "throughput_tasks_per_day": round(tpd, 3), | |
| "tokens_in_total": int(total_tokens_in), | |
| "tokens_out_total": int(total_tokens_out), | |
| "compute_s_total": round(total_compute, 2), | |
| "cost_usd_total": round(total_cost, 4), | |
| } | |
| def svg_render(w: World) -> str: | |
| k = compute_kpis(w) | |
| headline = ( | |
| f"ZEN Orchestrator Sandbox • step={w.step} • sim_days={k['sim_time_days']} • " | |
| f"agents={k['agents']} • done={k['tasks_done']} • backlog={k['tasks_backlog']} • " | |
| f"incidents_open={k['incidents_open']} • cost=${k['cost_usd_total']}" | |
| ) | |
| detail = f"tick_hours={w.tick_hours} • difficulty={w.difficulty} • incident_rate={round(w.incident_rate,3)}" | |
| css = f""" | |
| <style> | |
| .root {{ | |
| background: {COL_BG}; | |
| border-radius: 18px; | |
| overflow: hidden; | |
| box-shadow: 0 18px 40px rgba(0,0,0,0.45); | |
| }} | |
| .hud {{ | |
| font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial; | |
| fill: {COL_TEXT}; | |
| }} | |
| .hudSmall {{ | |
| fill: {COL_TEXT_DIM}; | |
| }} | |
| .gridline {{ | |
| stroke: {COL_GRID}; | |
| stroke-width: 1; | |
| }} | |
| .agent {{ | |
| transition: transform 220ms cubic-bezier(.2,.8,.2,1); | |
| filter: drop-shadow(0px 10px 12px rgba(0,0,0,0.45)); | |
| }} | |
| .pulse {{ | |
| animation: pulse 1.2s ease-in-out infinite; | |
| opacity: 0.22; | |
| }} | |
| @keyframes pulse {{ | |
| 0% {{ transform: scale(1.0); opacity: 0.14; }} | |
| 50% {{ transform: scale(1.15); opacity: 0.26; }} | |
| 100% {{ transform: scale(1.0); opacity: 0.14; }} | |
| }} | |
| .tag {{ | |
| fill: rgba(0,0,0,0.38); | |
| }} | |
| .tile {{ | |
| shape-rendering: crispEdges; | |
| }} | |
| </style> | |
| """ | |
| parts = [f""" | |
| <div class="root"> | |
| {css} | |
| <svg width="{SVG_W}" height="{SVG_H}" viewBox="0 0 {SVG_W} {SVG_H}"> | |
| <rect x="0" y="0" width="{SVG_W}" height="{SVG_H}" fill="{COL_BG}"/> | |
| <rect x="0" y="0" width="{SVG_W}" height="{HUD_H}" fill="{COL_PANEL}"/> | |
| <text class="hud" x="16" y="28" font-size="16" font-weight="800">{headline}</text> | |
| <text class="hud hudSmall" x="16" y="52" font-size="12">{detail}</text> | |
| """] | |
| for y in range(GRID_H): | |
| for x in range(GRID_W): | |
| t = w.grid[y][x] | |
| col = TILE_COL.get(t, TILE_COL[EMPTY]) | |
| px = x * TILE | |
| py = HUD_H + y * TILE | |
| parts.append(f'<rect class="tile" x="{px}" y="{py}" width="{TILE}" height="{TILE}" fill="{col}"/>') | |
| if t == INCIDENT: | |
| parts.append(f'<circle cx="{px+TILE/2}" cy="{py+TILE/2}" r="7" fill="rgba(0,0,0,0.25)"/>') | |
| parts.append(f'<circle cx="{px+TILE/2}" cy="{py+TILE/2}" r="4" fill="rgba(255,255,255,0.65)"/>') | |
| for x in range(GRID_W + 1): | |
| px = x * TILE | |
| parts.append(f'<line class="gridline" x1="{px}" y1="{HUD_H}" x2="{px}" y2="{SVG_H}"/>') | |
| for y in range(GRID_H + 1): | |
| py = HUD_H + y * TILE | |
| parts.append(f'<line class="gridline" x1="0" y1="{py}" x2="{SVG_W}" y2="{py}"/>') | |
| for i, ag in enumerate(w.agents.values()): | |
| col = AGENT_COLORS[i % len(AGENT_COLORS)] | |
| px = ag.x * TILE | |
| py = HUD_H + ag.y * TILE | |
| parts.append(f""" | |
| <g class="agent" style="transform: translate({px}px, {py}px);"> | |
| <circle class="pulse" cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.48}" fill="{col}"></circle> | |
| <circle cx="{TILE/2}" cy="{TILE/2}" r="{TILE*0.34}" fill="{col}" opacity="0.98"></circle> | |
| <rect class="tag" x="{TILE*0.10}" y="{TILE*0.08}" width="{TILE*0.80}" height="14" rx="7"/> | |
| <text x="{TILE/2}" y="{TILE*0.08 + 11}" text-anchor="middle" font-size="9" | |
| fill="rgba(235,240,255,0.90)" font-family="ui-sans-serif, system-ui">{ag.name}</text> | |
| """) | |
| bar_w = TILE * 0.80 | |
| bx = TILE/2 - bar_w/2 | |
| by = TILE * 0.82 | |
| parts.append(f'<rect x="{bx}" y="{by}" width="{bar_w}" height="6" rx="4" fill="rgba(255,255,255,0.12)"/>') | |
| parts.append(f'<rect x="{bx}" y="{by}" width="{bar_w*(clamp(ag.energy,0,100)/100.0)}" height="6" rx="4" fill="rgba(122,255,200,0.85)"/>') | |
| parts.append("</g>") | |
| parts.append("</svg></div>") | |
| return "".join(parts) | |
| def agents_text(w: World) -> str: | |
| lines = [] | |
| for ag in w.agents.values(): | |
| lines.append( | |
| f"{ag.name} | role={ag.role} | model={ag.model} | key_group={ag.key_group} | " | |
| f"state={ag.state} | energy={round(ag.energy,1)} | " | |
| f"task={ag.current_task_id or '-'} | cost=${round(ag.cost_usd,4)} | " | |
| f"tin={ag.tokens_in} tout={ag.tokens_out}" | |
| ) | |
| return "\n".join(lines) if lines else "(no agents yet)" | |
| def tasks_text(w: World) -> str: | |
| tasks = list(w.tasks.values()) | |
| tasks.sort(key=lambda t: (t.status != "done", -t.priority, t.created_step)) | |
| out = [] | |
| for t in tasks[:18]: | |
| bl = f" blockers={t.blockers}" if t.blockers else "" | |
| out.append( | |
| f"{t.id} [{t.status}] p={t.priority} d={t.difficulty} prog={round(t.progress,2)} " | |
| f"est={t.est_hours}h assigned={t.assigned_to or '-'} :: {t.title}{bl}" | |
| ) | |
| return "\n".join(out) if out else "(no tasks yet)" | |
| def events_text(w: World) -> str: | |
| return "\n".join(w.events[-20:]) if w.events else "" | |
| def kpis_text(w: World) -> str: | |
| return json.dumps(compute_kpis(w), indent=2) | |
| def run_data_df(w: World, rows: int) -> pd.DataFrame: | |
| rows = int(max(10, rows)) | |
| if not w.runlog: | |
| return pd.DataFrame(columns=[ | |
| "step","sim_time_hours","agent","role","model","key_group","task_id","task_status","task_progress", | |
| "action","thoughts","tokens_in","tokens_out","cost_usd","compute_s","error" | |
| ]) | |
| return pd.DataFrame(w.runlog[-rows:]) | |
| def ui_refresh(w: World, run_rows: int): | |
| return ( | |
| svg_render(w), | |
| agents_text(w), | |
| tasks_text(w), | |
| events_text(w), | |
| kpis_text(w), | |
| run_data_df(w, run_rows), | |
| ) | |
| TITLE = "ZEN Orchestrator Sandbox — Business-grade Agent Orchestra Simulator" | |
| with gr.Blocks(title=TITLE) as demo: | |
| gr.Markdown( | |
| f"## {TITLE}\n" | |
| "Business-oriented multi-agent simulation with game-like visuals, time controls, run logging, and optional model keys." | |
| ) | |
| # CSS scroll wrapper for the dataframe (Gradio 5.49.1-safe) | |
| gr.HTML(""" | |
| <style> | |
| .zen-scroll { | |
| max-height: 320px; | |
| overflow: auto; | |
| border-radius: 12px; | |
| border: 1px solid rgba(255,255,255,0.10); | |
| background: rgba(255,255,255,0.03); | |
| padding: 10px; | |
| } | |
| </style> | |
| """) | |
| w_state = gr.State(init_world(1337)) | |
| autoplay_on = gr.State(False) | |
| keyrings_state = gr.State({"none": ""}) | |
| pricing_state = gr.State(DEFAULT_MODEL_PRICING) | |
| with gr.Row(): | |
| arena = gr.HTML(label="Office World (Animated SVG)") | |
| with gr.Column(scale=1): | |
| agents_box = gr.Textbox(label="Agents", lines=10) | |
| tasks_box = gr.Textbox(label="Task Backlog", lines=10) | |
| with gr.Row(): | |
| events_box = gr.Textbox(label="Event Log", lines=8) | |
| kpi_box = gr.Textbox(label="KPIs (JSON)", lines=8) | |
| gr.Markdown("### Run Data (scroll + download)") | |
| with gr.Row(): | |
| runlog_rows = gr.Slider(50, 1500, value=250, step=50, label="Run Data rows to display") | |
| download_btn = gr.Button("Download Run Data CSV") | |
| # Scroll wrapper around dataframe | |
| gr.HTML('<div class="zen-scroll">') | |
| run_data = gr.Dataframe(label="Run Data", interactive=False, wrap=True) # ✅ no height kwarg | |
| gr.HTML('</div>') | |
| download_file = gr.File(label="CSV Download", interactive=False) | |
| gr.Markdown("### Scenario + Orchestration Controls") | |
| with gr.Row(): | |
| seed_in = gr.Number(value=1337, precision=0, label="Seed") | |
| tick_hours = gr.Slider(0.5, 168.0, value=4.0, step=0.5, label="Simulated hours per tick") | |
| difficulty = gr.Slider(1, 5, value=3, step=1, label="Global difficulty") | |
| incident_rate = gr.Slider(0.0, 0.35, value=0.07, step=0.01, label="Incident rate per tick") | |
| with gr.Row(): | |
| btn_reset = gr.Button("Reset World") | |
| run_n = gr.Number(value=10, precision=0, label="Run N ticks") | |
| btn_run = gr.Button("Run") | |
| gr.Markdown("### Add Tasks") | |
| with gr.Row(): | |
| task_title = gr.Textbox(label="Task title", value="Build onboarding automation") | |
| task_desc = gr.Textbox(label="Task description", value="Design an end-to-end flow: intake, validation, audit log, export.") | |
| with gr.Row(): | |
| task_p = gr.Slider(1, 5, value=4, step=1, label="Priority") | |
| task_d = gr.Slider(1, 5, value=3, step=1, label="Difficulty") | |
| task_est = gr.Slider(0.25, 200.0, value=16.0, step=0.25, label="Estimated hours") | |
| btn_add_task = gr.Button("Add Task") | |
| gr.Markdown("### Add Agents") | |
| with gr.Row(): | |
| agent_name = gr.Textbox(label="Agent name", value="Agent-01") | |
| agent_role = gr.Dropdown( | |
| choices=["Generalist", "Ops", "HR Automation", "Engineer", "Analyst", "Incident Response", "PM"], | |
| value="Engineer", | |
| label="Role", | |
| ) | |
| with gr.Row(): | |
| model_choice = gr.Dropdown( | |
| choices=["Simulated-Local", "gpt-4o-mini", "gpt-4o", "gpt-5"], | |
| value="Simulated-Local", | |
| label="Model", | |
| ) | |
| key_group = gr.Dropdown( | |
| choices=["none", "key1", "key2", "key3"], | |
| value="none", | |
| label="Key group", | |
| ) | |
| btn_add_agent = gr.Button("Add Agent") | |
| gr.Markdown("### Model Keys + Pricing (Optional)") | |
| with gr.Row(): | |
| oai_base = gr.Textbox(label="OpenAI-compatible base URL", value=DEFAULT_OAI_BASE) | |
| context_prompt = gr.Textbox(label="Global Context Prompt", value="Simulate a business ops team with auditability and cost tracking.", lines=3) | |
| with gr.Row(): | |
| key1 = gr.Textbox(label="API Key (key1)", type="password") | |
| key2 = gr.Textbox(label="API Key (key2)", type="password") | |
| key3 = gr.Textbox(label="API Key (key3)", type="password") | |
| pricing_json = gr.Textbox(label="Model pricing JSON (USD per 1M tokens)", value=json.dumps(DEFAULT_MODEL_PRICING, indent=2), lines=8) | |
| btn_apply_keys = gr.Button("Apply Keys + Pricing") | |
| gr.Markdown("### Autoplay") | |
| autoplay_speed = gr.Slider(0.05, 1.2, value=0.18, step=0.01, label="Autoplay interval (seconds)") | |
| with gr.Row(): | |
| btn_play = gr.Button("▶ Start Autoplay") | |
| btn_pause = gr.Button("⏸ Stop Autoplay") | |
| timer = gr.Timer(value=0.18, active=False) | |
| def do_reset(seed: int, rows: int): | |
| w = init_world(int(seed)) | |
| return (*ui_refresh(w, rows), w) | |
| btn_reset.click( | |
| do_reset, | |
| inputs=[seed_in, runlog_rows], | |
| outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], | |
| queue=True, | |
| ) | |
| def add_task_clicked(w: World, rows: int, title: str, desc: str, p: int, d: int, est: float): | |
| add_task(w, title, desc, p, d, est) | |
| return (*ui_refresh(w, rows), w) | |
| btn_add_task.click( | |
| add_task_clicked, | |
| inputs=[w_state, runlog_rows, task_title, task_desc, task_p, task_d, task_est], | |
| outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], | |
| queue=True, | |
| ) | |
| def add_agent_clicked(w: World, rows: int, name: str, role: str, model: str, kg: str): | |
| name = (name or "").strip() or f"Agent-{len(w.agents)+1:02d}" | |
| if name in w.agents: | |
| name = f"{name}-{len(w.agents)+1}" | |
| add_agent(w, name=name, model=model, key_group=kg, role=role, seed_bump=len(w.agents) * 19) | |
| return (*ui_refresh(w, rows), w) | |
| btn_add_agent.click( | |
| add_agent_clicked, | |
| inputs=[w_state, runlog_rows, agent_name, agent_role, model_choice, key_group], | |
| outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], | |
| queue=True, | |
| ) | |
| def apply_keys_pricing(keys_state: Dict[str, str], pricing_state_obj: Dict[str, Any], k1: str, k2: str, k3: str, pricing_txt: str): | |
| keys_state = dict(keys_state) if isinstance(keys_state, dict) else {"none": ""} | |
| keys_state["none"] = "" | |
| if k1: keys_state["key1"] = k1.strip() | |
| if k2: keys_state["key2"] = k2.strip() | |
| if k3: keys_state["key3"] = k3.strip() | |
| pj = safe_json(pricing_txt, fallback=None) | |
| if isinstance(pj, dict): | |
| pricing_state_obj = pj | |
| return keys_state, pricing_state_obj | |
| btn_apply_keys.click( | |
| apply_keys_pricing, | |
| inputs=[keyrings_state, pricing_state, key1, key2, key3, pricing_json], | |
| outputs=[keyrings_state, pricing_state], | |
| queue=True, | |
| ) | |
| def run_clicked(w: World, rows: int, n: int, th: float, diff: int, ir: float, | |
| keys_state: Dict[str, str], pricing_obj: Dict[str, Any], base_url: str, ctx: str): | |
| w.tick_hours = float(th) | |
| w.difficulty = int(diff) | |
| w.incident_rate = float(ir) | |
| w.events.append(f"[t={w.step}] Scenario updated: tick_hours={w.tick_hours}, difficulty={w.difficulty}, incident_rate={w.incident_rate}") | |
| n = int(max(1, n)) | |
| r = make_rng(w.seed + w.step * 101) | |
| for _ in range(n): | |
| tick(w, r, pricing_obj, keys_state, base_url, ctx) | |
| return (*ui_refresh(w, rows), w) | |
| btn_run.click( | |
| run_clicked, | |
| inputs=[w_state, runlog_rows, run_n, tick_hours, difficulty, incident_rate, keyrings_state, pricing_state, oai_base, context_prompt], | |
| outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], | |
| queue=True, | |
| ) | |
| def download_run_data(w: World, rows: int): | |
| df = run_data_df(w, rows=50000) | |
| path = to_csv_download(df) | |
| return path | |
| download_btn.click( | |
| download_run_data, | |
| inputs=[w_state, runlog_rows], | |
| outputs=[download_file], | |
| queue=True, | |
| ) | |
| def autoplay_start(interval: float): | |
| return gr.update(value=float(interval), active=True), True | |
| def autoplay_stop(): | |
| return gr.update(active=False), False | |
| btn_play.click(autoplay_start, inputs=[autoplay_speed], outputs=[timer, autoplay_on], queue=True) | |
| btn_pause.click(autoplay_stop, inputs=[], outputs=[timer, autoplay_on], queue=True) | |
| def autoplay_tick(w: World, is_on: bool, rows: int, th: float, diff: int, ir: float, | |
| keys_state: Dict[str, str], pricing_obj: Dict[str, Any], base_url: str, ctx: str): | |
| if not is_on: | |
| return (*ui_refresh(w, rows), w, is_on, gr.update()) | |
| w.tick_hours = float(th) | |
| w.difficulty = int(diff) | |
| w.incident_rate = float(ir) | |
| r = make_rng(w.seed + w.step * 101) | |
| tick(w, r, pricing_obj, keys_state, base_url, ctx) | |
| return (*ui_refresh(w, rows), w, True, gr.update()) | |
| timer.tick( | |
| autoplay_tick, | |
| inputs=[w_state, autoplay_on, runlog_rows, tick_hours, difficulty, incident_rate, keyrings_state, pricing_state, oai_base, context_prompt], | |
| outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state, autoplay_on, timer], | |
| queue=True, | |
| ) | |
| # initial render | |
| def on_load(w: World, rows: int): | |
| return (*ui_refresh(w, rows), w) | |
| demo.load( | |
| on_load, | |
| inputs=[w_state, runlog_rows], | |
| outputs=[arena, agents_box, tasks_box, events_box, kpi_box, run_data, w_state], | |
| queue=True, | |
| ) | |
| demo.queue().launch(ssr_mode=False) | |