""" Animated robot orchestrator widget for the SpindleFlow RL demo. Exports one public function: render_orchestrator(state, height=620) All HTML/CSS/JS is self-contained — no CDN, no external calls. Safe for Hugging Face Spaces iframe sandbox. """ from __future__ import annotations import json import math # ── Agent color and icon maps ───────────────────────────────────────────────── SPEC_COLORS = { "frontend_react": "#00d4ff", "backend_api": "#7c3aed", "database_architect": "#f59e0b", "devops_engineer": "#10b981", "security_analyst": "#ef4444", "product_strategist": "#8b5cf6", "ux_designer": "#ec4899", "tech_writer": "#94a3b8", } SPEC_ICONS = { "frontend_react": "FE", "backend_api": "API", "database_architect": "DB", "devops_engineer": "OPS", "security_analyst": "SEC", "product_strategist": "PM", "ux_designer": "UX", "tech_writer": "DOC", } # ── Layout ──────────────────────────────────────────────────────────────────── def _agent_positions(agent_ids: list, canvas_w: int = 780, canvas_h: int = 560) -> dict: """Return {agent_id: (x, y)} laid out in a right-side arc.""" arc_cx = canvas_w - 155 arc_cy = canvas_h / 2 arc_r = 185 n = len(agent_ids) positions = {} angle_start, angle_end = -70, 70 for i, aid in enumerate(agent_ids): angle = 0 if n == 1 else angle_start + (angle_end - angle_start) * i / (n - 1) rad = math.radians(angle) x = arc_cx + arc_r * math.sin(rad) y = arc_cy + arc_r * math.sin(rad) * 0.0 + arc_cy * 0 + \ arc_r * (-math.cos(math.radians(angle_start)) + (-math.cos(rad) + math.cos(math.radians(angle_start)))) + arc_cy - arc_cy # Clean arc formula: spread vertically, push right x = round(arc_cx + arc_r * math.sin(rad)) y = round(arc_cy - arc_r * math.cos(rad) + arc_r * math.cos(math.radians(angle_start))) positions[aid] = (x, y) return positions # ── SVG builders ────────────────────────────────────────────────────────────── def _robot_svg() -> str: return """ """ def _agent_card_svg(agent_id: str, x: int, y: int, status: str, color: str) -> str: """Returns SVG for one agent card. status: idle | active | done.""" icon = SPEC_ICONS.get(agent_id, agent_id[:3].upper()) label = agent_id.replace("_", " ").title() label = label[:16] + ("…" if len(label) > 16 else "") status_class = {"idle": "agent-idle", "active": "agent-active", "done": "agent-done"}.get(status, "agent-idle") opacity = "1.0" if status != "idle" else "0.45" return f""" {icon} {label} """ def _beam_svg(edges: list, agent_positions: dict) -> str: """Returns SVG beam lines for all current delegation edges.""" robot_hand_x, robot_hand_y = 225, 302 lines = [] for caller, callee in edges: if callee not in agent_positions: continue tx, ty = agent_positions[callee] color = SPEC_COLORS.get(callee, "#00d4ff") lines.append(f""" """) return "\n".join(lines) # ── HTML template ───────────────────────────────────────────────────────────── def _html_template(*, agents_svg, beams_svg, robot_svg, state_json, task_short, reward_html, step, phase, mode, mode_color) -> str: return f"""
Orchestrator
Specialists
{beams_svg} {agents_svg} {robot_svg}
Step {step}
Phase {phase}
Mode {mode}
Reward {reward_html}
{task_short}
""" # ── State assembler ─────────────────────────────────────────────────────────── def _build_html(state: dict) -> str: called = state.get("called", []) active = state.get("active", "") edges = state.get("edges", []) task = state.get("task", "") step = state.get("step", 0) mode = state.get("mode", "SEQUENTIAL") done = state.get("done", False) reward = state.get("reward", None) phase = state.get("phase", 1) all_agents = list(SPEC_COLORS.keys()) positions = _agent_positions(all_agents) def agent_status(aid): if aid == active: return "active" if aid in called: return "done" return "idle" agents_svg = "\n".join( _agent_card_svg(aid, *positions[aid], agent_status(aid), SPEC_COLORS[aid]) for aid in all_agents ) beams_svg = _beam_svg(edges, positions) robot_svg = _robot_svg() robot_state = ( "delegating" if active else "done" if done else "thinking" if step > 0 else "idle" ) task_short = (task[:72] + "…") if len(task) > 72 else task if reward is not None: sign = "+" if reward >= 0 else "" reward_color = "#10b981" if reward >= 0 else "#ef4444" reward_html = f'{sign}{reward:.3f}' else: reward_html = '' mode_color = { "SEQUENTIAL": "#00d4ff", "PARALLEL": "#7c3aed", "FAN_OUT_REDUCE": "#f59e0b", "ITERATIVE": "#10b981", "STOP": "#ef4444", }.get(mode, "#64748b") state_json = json.dumps({ "robot_state": robot_state, "active": active, "called": called, "step": step, "done": done, "mode": mode, }) return _html_template( agents_svg = agents_svg, beams_svg = beams_svg, robot_svg = robot_svg, state_json = state_json, task_short = task_short, reward_html = reward_html, step = step, phase = phase, mode = mode, mode_color = mode_color, ) # ── Public API ──────────────────────────────────────────────────────────────── def render_orchestrator(state: dict, height: int = 620) -> None: """ Render the animated robot orchestrator widget in a Streamlit page. Call this wherever the delegation graph currently renders. state keys: called — list of specialist IDs called so far this episode active — specialist being called right now (or "") edges — list of [caller_id, callee_id] pairs task — task description string step — current step number mode — delegation mode name (e.g. "SEQUENTIAL") done — whether the episode is finished reward — cumulative reward float (or None) phase — curriculum phase int """ import streamlit.components.v1 as components components.html(_build_html(state), height=height, scrolling=False)