"""
Animated robot orchestrator widget for the SpindleFlow RL demo.
Exports one public function: render_orchestrator(state, height=620)
All HTML/CSS/JS is self-contained — no CDN, no external calls.
Safe for Hugging Face Spaces iframe sandbox.
"""
from __future__ import annotations
import json
import math
# ── Agent color and icon maps ─────────────────────────────────────────────────
SPEC_COLORS = {
"frontend_react": "#00d4ff",
"backend_api": "#7c3aed",
"database_architect": "#f59e0b",
"devops_engineer": "#10b981",
"security_analyst": "#ef4444",
"product_strategist": "#8b5cf6",
"ux_designer": "#ec4899",
"tech_writer": "#94a3b8",
}
SPEC_ICONS = {
"frontend_react": "FE",
"backend_api": "API",
"database_architect": "DB",
"devops_engineer": "OPS",
"security_analyst": "SEC",
"product_strategist": "PM",
"ux_designer": "UX",
"tech_writer": "DOC",
}
# ── Layout ────────────────────────────────────────────────────────────────────
def _agent_positions(agent_ids: list, canvas_w: int = 780, canvas_h: int = 560) -> dict:
"""Return {agent_id: (x, y)} laid out in a right-side arc."""
arc_cx = canvas_w - 155
arc_cy = canvas_h / 2
arc_r = 185
n = len(agent_ids)
positions = {}
angle_start, angle_end = -70, 70
for i, aid in enumerate(agent_ids):
angle = 0 if n == 1 else angle_start + (angle_end - angle_start) * i / (n - 1)
rad = math.radians(angle)
x = arc_cx + arc_r * math.sin(rad)
y = arc_cy + arc_r * math.sin(rad) * 0.0 + arc_cy * 0 + \
arc_r * (-math.cos(math.radians(angle_start)) + (-math.cos(rad) + math.cos(math.radians(angle_start)))) + arc_cy - arc_cy
# Clean arc formula: spread vertically, push right
x = round(arc_cx + arc_r * math.sin(rad))
y = round(arc_cy - arc_r * math.cos(rad) + arc_r * math.cos(math.radians(angle_start)))
positions[aid] = (x, y)
return positions
# ── SVG builders ──────────────────────────────────────────────────────────────
def _robot_svg() -> str:
return """
"""
def _agent_card_svg(agent_id: str, x: int, y: int,
status: str, color: str) -> str:
"""Returns SVG for one agent card. status: idle | active | done."""
icon = SPEC_ICONS.get(agent_id, agent_id[:3].upper())
label = agent_id.replace("_", " ").title()
label = label[:16] + ("…" if len(label) > 16 else "")
status_class = {"idle": "agent-idle", "active": "agent-active",
"done": "agent-done"}.get(status, "agent-idle")
opacity = "1.0" if status != "idle" else "0.45"
return f"""
{icon}
{label}
✓
"""
def _beam_svg(edges: list, agent_positions: dict) -> str:
"""Returns SVG beam lines for all current delegation edges."""
robot_hand_x, robot_hand_y = 225, 302
lines = []
for caller, callee in edges:
if callee not in agent_positions:
continue
tx, ty = agent_positions[callee]
color = SPEC_COLORS.get(callee, "#00d4ff")
lines.append(f"""
""")
return "\n".join(lines)
# ── HTML template ─────────────────────────────────────────────────────────────
def _html_template(*, agents_svg, beams_svg, robot_svg, state_json,
task_short, reward_html, step, phase, mode, mode_color) -> str:
return f"""
Orchestrator
Specialists
Step
{step}
Phase
{phase}
Mode
{mode}
Reward
{reward_html}
{task_short}
"""
# ── State assembler ───────────────────────────────────────────────────────────
def _build_html(state: dict) -> str:
called = state.get("called", [])
active = state.get("active", "")
edges = state.get("edges", [])
task = state.get("task", "")
step = state.get("step", 0)
mode = state.get("mode", "SEQUENTIAL")
done = state.get("done", False)
reward = state.get("reward", None)
phase = state.get("phase", 1)
all_agents = list(SPEC_COLORS.keys())
positions = _agent_positions(all_agents)
def agent_status(aid):
if aid == active: return "active"
if aid in called: return "done"
return "idle"
agents_svg = "\n".join(
_agent_card_svg(aid, *positions[aid], agent_status(aid), SPEC_COLORS[aid])
for aid in all_agents
)
beams_svg = _beam_svg(edges, positions)
robot_svg = _robot_svg()
robot_state = (
"delegating" if active else
"done" if done else
"thinking" if step > 0 else
"idle"
)
task_short = (task[:72] + "…") if len(task) > 72 else task
if reward is not None:
sign = "+" if reward >= 0 else ""
reward_color = "#10b981" if reward >= 0 else "#ef4444"
reward_html = f'{sign}{reward:.3f}'
else:
reward_html = '—'
mode_color = {
"SEQUENTIAL": "#00d4ff",
"PARALLEL": "#7c3aed",
"FAN_OUT_REDUCE": "#f59e0b",
"ITERATIVE": "#10b981",
"STOP": "#ef4444",
}.get(mode, "#64748b")
state_json = json.dumps({
"robot_state": robot_state,
"active": active,
"called": called,
"step": step,
"done": done,
"mode": mode,
})
return _html_template(
agents_svg = agents_svg,
beams_svg = beams_svg,
robot_svg = robot_svg,
state_json = state_json,
task_short = task_short,
reward_html = reward_html,
step = step,
phase = phase,
mode = mode,
mode_color = mode_color,
)
# ── Public API ────────────────────────────────────────────────────────────────
def render_orchestrator(state: dict, height: int = 620) -> None:
"""
Render the animated robot orchestrator widget in a Streamlit page.
Call this wherever the delegation graph currently renders.
state keys:
called — list of specialist IDs called so far this episode
active — specialist being called right now (or "")
edges — list of [caller_id, callee_id] pairs
task — task description string
step — current step number
mode — delegation mode name (e.g. "SEQUENTIAL")
done — whether the episode is finished
reward — cumulative reward float (or None)
phase — curriculum phase int
"""
import streamlit.components.v1 as components
components.html(_build_html(state), height=height, scrolling=False)