Spaces:
Running
Running
| """ | |
| Centralized Agent Type Registry. | |
| Adding a new agent type: | |
| 1. Add an entry to AGENT_REGISTRY below | |
| 2. Implement its streaming handler (or use "builtin:chat" for simple LLM proxy) | |
| The frontend fetches the registry from GET /api/agents at startup — no JS changes needed. | |
| SSE Event Protocol — every handler emits `data: {JSON}\\n\\n` lines with a `type` field. | |
| Required events (all agents): | |
| - done : stream complete | |
| - error : { content: str } error message | |
| Common optional events: | |
| - thinking : { content: str } reasoning text | |
| - content : { content: str } streamed response tokens | |
| - result_preview: { content: str, figures: dict } inline result with optional images | |
| - result : { content: str, figures: dict } final output for command center widget | |
| - generating : still working between turns | |
| - retry : { attempt, max_attempts, delay, message } retrying after error | |
| Code-specific events: | |
| - code_start : { code: str } before code execution | |
| - code : { output, error, images } code cell result | |
| - upload : { paths, output } files uploaded to sandbox | |
| - download : { paths, output } files downloaded from sandbox | |
| Research-specific events: | |
| - status : { message } progress text | |
| - queries : { queries: list, iteration: int } search queries generated | |
| - source : { url, title, query_index, ... } source found | |
| - query_stats : { query_index, relevant_count, irrelevant_count, error_count } | |
| - assessment : { sufficient, missing_aspects, findings_count, reasoning } | |
| """ | |
| # ============================================================ | |
| # THE REGISTRY — single source of truth for all agent types | |
| # ============================================================ | |
| AGENT_REGISTRY = { | |
| "command": { | |
| "label": "TASKS", | |
| "system_prompt": ( | |
| "You are a helpful AI assistant in the AgentUI command center.\n\n" | |
| "{tools_section}\n\n" | |
| "## Routing\n\n" | |
| "**Default to the web agent** for tasks needing external information (searches, lookups, fact-checking, URLs).\n\n" | |
| "Only use the research agent when the user explicitly needs deep multi-source analysis " | |
| "(comparisons, comprehensive reports). It's expensive — dozens of parallel searches.\n\n" | |
| "- **Answer directly**: factual recall, simple math, summarizing previous results, clarifications\n" | |
| "- **Web agent**: searches, lookups, fact-checking, reading URLs\n" | |
| "- **Code agent**: data analysis, code execution, visualizations, debugging\n" | |
| "- **Research agent**: ONLY deep multi-source analysis, comparisons, reports\n" | |
| "- **Image agent**: generating or editing images (ONLY when the user explicitly asks to generate/create an image — never for finding/showing existing photos)\n\n" | |
| "When delegating, provide a clear objective, scope boundaries, and expected output format.\n" | |
| "**Figures are shared across agents.** If a previous agent produced a figure (e.g., figure_T3_1), " | |
| "you can pass its reference to another agent — for example, ask the image agent to edit figure_T3_1 " | |
| "or the code agent to process it. Just include the reference name in the task description.\n\n" | |
| "## Task Decomposition — ALWAYS parallelize\n\n" | |
| "**RULE: When a request mentions multiple distinct entities or topics, " | |
| "launch a separate agent for each.** Never combine multiple lookups into one agent.\n\n" | |
| "Examples:\n" | |
| "- \"Compare React, Vue, and Svelte\" → 3 web agents, NOT 1\n" | |
| "- \"Analyze CSV and find market data\" → code agent + web agent in parallel\n\n" | |
| "Make ALL tool calls in the same response. For single-item tasks, just launch one agent.\n\n" | |
| "## Guidelines\n\n" | |
| "- Be concise. Either answer directly OR launch an agent, not both.\n" | |
| "- Answer questions about existing results directly — don't re-launch agents.\n" | |
| "- Do NOT save/create files unless the user explicitly requests it.\n" | |
| "- Reuse task_id when a follow-up relates to an existing agent (preserves context and kernel).\n" | |
| "- Include key findings in YOUR response — don't just say \"see the agent result\".\n" | |
| "- **ALWAYS embed figures** from sub-agents in your response using the exact reference tags from the agent result " | |
| "(e.g., <figure_T3_1>). Sub-agent results are collapsed — if you don't embed the figure, the user won't see it.\n" | |
| "- If an agent was aborted by the user, acknowledge it and ask how to proceed — don't re-launch." | |
| ), | |
| "tool": None, | |
| "tool_arg": None, | |
| "has_counter": False, | |
| "in_menu": False, | |
| "in_launcher": False, | |
| "placeholder": "Enter message...", | |
| }, | |
| "agent": { | |
| "label": "AGENT", | |
| "system_prompt": ( | |
| "You are an autonomous web agent. Tools: **web_search(query)**, **read_url(url)**.\n\n" | |
| "## Strategy\n\n" | |
| "Before each tool call, briefly reflect on what you learned and what's still missing.\n\n" | |
| "1. **Search first** — use web_search to discover information\n" | |
| "2. **Snippets are data** — extract answers from search results before reading pages\n" | |
| "3. **Read with intent** — only read_url when snippets leave a gap. " | |
| "Prefer static HTML (articles, tables). Avoid JS-heavy sites, paywalls, raw downloads.\n" | |
| "4. **Adapt on failure** — if read_url returns junk, pick a fundamentally different source\n" | |
| "5. **Stop when done** — don't keep searching for a perfect source\n\n" | |
| "Be efficient (1-4 tool calls). Cite sources with URLs.\n\n" | |
| "## CRITICAL: You MUST provide a <result> tag\n\n" | |
| "Wrap your FULL answer in <result> tags — this is what the user sees in the command center. " | |
| "Everything outside <result> is only visible in the agent's own tab.\n\n" | |
| "<result>\n" | |
| "Your complete findings here.\n" | |
| "</result>\n" | |
| ), | |
| "tool": { | |
| "type": "function", | |
| "function": { | |
| "name": "launch_web_agent", | |
| "description": "Launch a web agent to search the web, look up information, or read specific pages. Use this as the DEFAULT for any task needing external information — quick searches, fact-checking, finding specific data, reading URLs.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "task": { | |
| "type": "string", | |
| "description": "The task or instruction for the agent. Should contain all necessary context." | |
| }, | |
| "task_id": { | |
| "type": "string", | |
| "description": "A 2-3 word summary of the task, separated by dashes." | |
| } | |
| }, | |
| "required": ["task", "task_id"] | |
| } | |
| } | |
| }, | |
| "tool_arg": "task", | |
| "has_counter": True, | |
| "in_menu": True, | |
| "in_launcher": True, | |
| "placeholder": "Enter message...", | |
| "capabilities": "Default for web tasks. Has tools: web_search(query), read_url(url). Fast — typically 1-4 tool calls.", | |
| }, | |
| "code": { | |
| "label": "CODE", | |
| "system_prompt": ( | |
| "You are a coding assistant with a Python sandbox (Jupyter kernel). " | |
| "Packages: pandas, matplotlib, numpy, scipy, scikit-learn, seaborn, plotly, requests, beautifulsoup4, etc.\n\n" | |
| "## Tools\n\n" | |
| "- **execute_code**: Run Python code. Stateful — variables/imports persist. " | |
| "Working dir: /home/user/ (use relative paths like './output.csv').\n" | |
| "- **upload_files**: Upload from user's workspace to sandbox (/home/user/<filename>).\n" | |
| "- **download_files**: Download from sandbox to workspace. " | |
| "ONLY when user explicitly asks to save/download.\n\n" | |
| "**Code runs in a remote sandbox, NOT locally.** " | |
| "Use upload_files before processing user files, download_files to send results back.\n\n" | |
| "## Guidelines\n\n" | |
| "- **Figures**: Call plt.show() — figures are auto-captured with names like figure_T4_1, figure_T4_2, etc. " | |
| "The exact names appear in the execution output. " | |
| "NEVER use both plt.savefig() and plt.show() (creates duplicates). " | |
| "To display a figure, embed the exact figure name from the output in your result text — do NOT use show_html with an <img> tag.\n" | |
| "- **Files**: Do NOT save/download unless explicitly requested. Never overwrite without permission.\n" | |
| "- Execute code incrementally and reflect on output between steps.\n\n" | |
| "## CRITICAL: You MUST provide a <result> tag\n\n" | |
| "Keep results SHORT (1-2 sentences). The user can see code and output above.\n" | |
| "Use the exact figure name from the execution output (e.g., <figure_T4_1>) to embed figures.\n\n" | |
| "<result>\n" | |
| "Here's the sine function plot:\n\n" | |
| "<figure_T4_1>\n" | |
| "</result>\n" | |
| ), | |
| "tool": { | |
| "type": "function", | |
| "function": { | |
| "name": "launch_code_agent", | |
| "description": "Launch a code agent with Python execution environment. Use this for data analysis, creating visualizations, running code, debugging, or anything involving programming.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "task": { | |
| "type": "string", | |
| "description": "The coding task or question. Should contain all necessary context." | |
| }, | |
| "task_id": { | |
| "type": "string", | |
| "description": "A 2-3 word summary of the task, separated by dashes." | |
| } | |
| }, | |
| "required": ["task", "task_id"] | |
| } | |
| } | |
| }, | |
| "tool_arg": "task", | |
| "has_counter": True, | |
| "in_menu": True, | |
| "in_launcher": True, | |
| "placeholder": "Enter message...", | |
| "capabilities": "Has tools: execute_code(Python), upload_files, download_files. Runs code in a Jupyter sandbox with pandas, numpy, matplotlib, etc.", | |
| }, | |
| "research": { | |
| "label": "RESEARCH", | |
| "system_prompt": ( | |
| "You are a research assistant for deep multi-source analysis.\n\n" | |
| "## Report Format\n\n" | |
| "- Be CONCISE — key findings, not lengthy prose\n" | |
| "- Use markdown TABLES for comparisons, statistics, structured data\n" | |
| "- Only use prose for synthesis that can't be tabulated\n" | |
| "- Start directly with findings — no title like \"Research Report:\"\n\n" | |
| "## CRITICAL: You MUST provide a <result> tag\n\n" | |
| "<result>\n" | |
| "Your concise, table-based report here (NO title/heading)\n" | |
| "</result>\n" | |
| ), | |
| "tool": { | |
| "type": "function", | |
| "function": { | |
| "name": "launch_research_agent", | |
| "description": "Launch a heavyweight research agent for deep multi-source analysis. ONLY use when the user explicitly needs comprehensive research across many sources (comparisons, reports, literature reviews). Do NOT use for simple searches — use launch_web_agent instead.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "topic": { | |
| "type": "string", | |
| "description": "The research topic or question. Should be clear and specific." | |
| }, | |
| "task_id": { | |
| "type": "string", | |
| "description": "A 2-3 word summary of the research topic, separated by dashes." | |
| } | |
| }, | |
| "required": ["topic", "task_id"] | |
| } | |
| } | |
| }, | |
| "tool_arg": "topic", | |
| "has_counter": True, | |
| "in_menu": True, | |
| "in_launcher": True, | |
| "placeholder": "Enter message...", | |
| "capabilities": "Heavyweight: runs dozens of parallel searches and reads many pages. Use only for deep analysis requiring multiple sources.", | |
| }, | |
| "image": { | |
| "label": "IMAGE", | |
| "system_prompt": ( | |
| "You are a creative AI assistant with image tools.\n\n" | |
| "## Tools\n\n" | |
| "- **generate_image(prompt)**: Generate from text. Returns figure reference (e.g., 'figure_T4_1').\n" | |
| "- **edit_image(prompt, source)**: Edit/transform an image. Source: URL, file path, or reference.\n" | |
| "- **read_image(source)**: Load a raster image (PNG, JPEG, GIF, WebP, BMP). " | |
| "SVG NOT supported. Returns figure reference.\n" | |
| "- **save_image(source, filename)**: Save an image to the workspace as PNG. " | |
| "Source: reference (e.g., 'figure_T4_1') or URL.\n\n" | |
| "## Strategy\n\n" | |
| "1. If user provides a URL/file, use read_image first to load it\n" | |
| "2. Use generate_image ONLY when explicitly asked to generate/create an image — " | |
| "never use it to \"find\" or \"show\" a photo of something\n" | |
| "3. Use edit_image to transform existing ones\n" | |
| "4. Write detailed prompts. Describe what you see and iterate if needed.\n\n" | |
| "## CRITICAL: You MUST provide a <result> tag\n\n" | |
| "Use the exact figure reference from tool output to embed figures in your result.\n" | |
| "Figure references are self-closing tags like <figure_T4_2> — do NOT add a closing </figure_T4_2> tag.\n\n" | |
| "<result>\n" | |
| "Here's the comic version of your image:\n\n" | |
| "<figure_T4_2>\n" | |
| "</result>\n" | |
| ), | |
| "tool": { | |
| "type": "function", | |
| "function": { | |
| "name": "launch_image_agent", | |
| "description": "Launch an image agent for generating or editing images using AI models. Use this for creating images from text, applying style transfers, editing photos, or transforming existing images (e.g., 'make this photo look like a comic'). Accepts image URLs as input.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "task": { | |
| "type": "string", | |
| "description": "The image task or description. Should contain all necessary context including any image URLs." | |
| }, | |
| "task_id": { | |
| "type": "string", | |
| "description": "A 2-3 word summary of the task, separated by dashes." | |
| } | |
| }, | |
| "required": ["task", "task_id"] | |
| } | |
| } | |
| }, | |
| "tool_arg": "task", | |
| "has_counter": True, | |
| "in_menu": True, | |
| "in_launcher": True, | |
| "placeholder": "Describe an image or paste a URL...", | |
| "capabilities": "Has tools: generate_image(prompt), edit_image(prompt, source), read_image_url(url). Can generate, edit, and load images via HuggingFace models.", | |
| }, | |
| } | |
| # ============================================================ | |
| # Derived helpers — replace scattered dicts across the codebase | |
| # ============================================================ | |
| def get_system_prompt(agent_key: str) -> str: | |
| """Get system prompt for an agent type.""" | |
| from datetime import date | |
| agent = AGENT_REGISTRY.get(agent_key) | |
| if not agent: | |
| return "" | |
| prompt = agent["system_prompt"] | |
| # For command center, fill in the tools section dynamically | |
| if "{tools_section}" in prompt: | |
| prompt = prompt.replace("{tools_section}", _build_tools_section()) | |
| # Append current date so agents have temporal context | |
| prompt += f"\n\nToday's date is {date.today().strftime('%B %d, %Y')}." | |
| return prompt | |
| def serialize_llm_response(response): | |
| """Extract raw LLM response into a JSON-serializable dict.""" | |
| choice = response.choices[0] | |
| msg = choice.message | |
| result = {"role": "assistant", "content": msg.content or ""} | |
| if msg.tool_calls: | |
| result["tool_calls"] = [ | |
| { | |
| "id": tc.id, | |
| "type": "function", | |
| "function": { | |
| "name": tc.function.name, | |
| "arguments": tc.function.arguments, | |
| }, | |
| } | |
| for tc in msg.tool_calls | |
| ] | |
| return result | |
| MAX_RETRIES = 3 | |
| RETRY_DELAYS = [2, 5, 10] | |
| def parse_llm_error(error: Exception) -> dict: | |
| """Parse LLM error to extract useful message for frontend.""" | |
| import json as _json | |
| import re as _re | |
| error_str = str(error) | |
| try: | |
| json_match = _re.search(r'\{.*\}', error_str) | |
| if json_match: | |
| error_data = _json.loads(json_match.group()) | |
| return { | |
| "message": error_data.get("message", error_str), | |
| "type": error_data.get("type", "unknown_error"), | |
| "retryable": error_data.get("type") == "too_many_requests_error" or "429" in error_str, | |
| } | |
| except Exception: | |
| pass | |
| retryable = any(x in error_str.lower() for x in ["429", "rate limit", "too many requests", "overloaded", "high traffic"]) | |
| # Strip HTML error pages (e.g. 503 from HuggingFace) to a short message | |
| if "<html" in error_str.lower(): | |
| status_match = _re.search(r'(\d{3})', error_str) | |
| error_str = f"Service error (HTTP {status_match.group(1)})" if status_match else "Service unavailable" | |
| return {"message": error_str, "type": "unknown_error", "retryable": retryable} | |
| def call_llm(client, model, messages, tools=None, extra_params=None, abort_event=None, call_number=0): | |
| """Centralized LLM call with retries and debug event emission. | |
| Generator that yields: | |
| - {"type": "debug_call_input", ...} before the call | |
| - {"type": "debug_call_output", ...} after success or final failure | |
| - {"type": "retry", ...} on retryable errors | |
| - {"type": "error", ...} on fatal errors | |
| - {"type": "aborted"} if abort_event fires during retry wait | |
| - {"_response": response} the actual OpenAI response (not an SSE event) | |
| The caller should forward all dicts to the SSE stream except those with "_response". | |
| Returns call_number (incremented) via the last yielded dict's "call_number" field. | |
| """ | |
| import copy | |
| import time | |
| call_number += 1 | |
| # Emit input before the call | |
| yield { | |
| "type": "debug_call_input", | |
| "call_number": call_number, | |
| "messages": copy.deepcopy(messages), | |
| } | |
| response = None | |
| last_error = None | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| call_params = { | |
| "messages": messages, | |
| "model": model, | |
| } | |
| if tools: | |
| call_params["tools"] = tools | |
| call_params["tool_choice"] = "auto" | |
| if extra_params: | |
| call_params["extra_body"] = extra_params | |
| response = client.chat.completions.create(**call_params) | |
| # Emit output on success | |
| yield { | |
| "type": "debug_call_output", | |
| "call_number": call_number, | |
| "response": serialize_llm_response(response), | |
| } | |
| # Yield the actual response object for the caller | |
| yield {"_response": response, "_call_number": call_number} | |
| return | |
| except Exception as e: | |
| last_error = e | |
| error_info = parse_llm_error(e) | |
| # Emit debug output for every failed attempt | |
| yield { | |
| "type": "debug_call_output", | |
| "call_number": call_number, | |
| "error": error_info["message"], | |
| "attempt": attempt + 1, | |
| "retryable": error_info["retryable"], | |
| } | |
| if attempt < MAX_RETRIES - 1 and error_info["retryable"]: | |
| delay = RETRY_DELAYS[attempt] | |
| yield { | |
| "type": "retry", | |
| "attempt": attempt + 1, | |
| "max_attempts": MAX_RETRIES, | |
| "delay": delay, | |
| "message": error_info["message"], | |
| "error_type": error_info.get("type", "unknown_error"), | |
| } | |
| if abort_event: | |
| abort_event.wait(delay) | |
| if abort_event.is_set(): | |
| yield {"type": "aborted"} | |
| return | |
| else: | |
| time.sleep(delay) | |
| else: | |
| yield {"type": "error", "content": error_info["message"]} | |
| return | |
| # Should not reach here, but just in case | |
| yield {"type": "error", "content": f"LLM error after {MAX_RETRIES} attempts: {str(last_error)}"} | |
| def nudge_for_result(client, model, messages, extra_params=None, extra_result_data=None, call_number=0): | |
| """Nudge an agent that finished without <result> tags to produce one. | |
| This is a generator that yields SSE events (content, result_preview, result, | |
| plus debug_call_input/output from call_llm). | |
| Call it after an agent's tool loop when no <result> was found. | |
| Args: | |
| client: OpenAI-compatible client | |
| model: Model name | |
| messages: Full message history (will be mutated — nudge message appended) | |
| extra_params: Optional extra_body params for the LLM call | |
| extra_result_data: Optional dict of extra fields to include in result events | |
| (e.g. {"figures": {...}} or {"images": {...}}) | |
| call_number: Current debug call number for sequential numbering | |
| """ | |
| import re | |
| import logging | |
| _logger = logging.getLogger(__name__) | |
| messages.append({ | |
| "role": "user", | |
| "content": "Please provide your final answer now. Wrap it in <result> tags." | |
| }) | |
| response = None | |
| for event in call_llm(client, model, messages, extra_params=extra_params, call_number=call_number): | |
| if "_response" in event: | |
| response = event["_response"] | |
| else: | |
| yield event | |
| if event.get("type") in ("error", "aborted"): | |
| return | |
| if not response: | |
| return | |
| nudge_content = response.choices[0].message.content or "" | |
| result_match = re.search(r'<result>(.*?)</result>', nudge_content, re.DOTALL | re.IGNORECASE) | |
| extra = extra_result_data or {} | |
| if result_match: | |
| result_content = result_match.group(1).strip() | |
| thinking = re.sub(r'<result>.*?</result>', '', nudge_content, flags=re.DOTALL | re.IGNORECASE).strip() | |
| if thinking: | |
| yield {"type": "content", "content": thinking} | |
| yield {"type": "result_preview", "content": result_content, **extra} | |
| yield {"type": "result", "content": result_content, **extra} | |
| elif nudge_content.strip(): | |
| # No result tags but got content — use it as the result | |
| yield {"type": "result_preview", "content": nudge_content.strip(), **extra} | |
| yield {"type": "result", "content": nudge_content.strip(), **extra} | |
| def get_tools() -> list: | |
| """Get tool definitions for the command center.""" | |
| return [ | |
| agent["tool"] | |
| for agent in AGENT_REGISTRY.values() | |
| if agent["tool"] is not None | |
| ] | |
| def get_agent_type_map() -> dict: | |
| """Map tool function names to agent keys.""" | |
| result = {} | |
| for key, agent in AGENT_REGISTRY.items(): | |
| if agent["tool"] is not None: | |
| func_name = agent["tool"]["function"]["name"] | |
| result[func_name] = key | |
| return result | |
| def get_tool_arg(agent_key: str) -> str: | |
| """Get the argument name for extracting the initial message from tool call args.""" | |
| agent = AGENT_REGISTRY.get(agent_key) | |
| return agent["tool_arg"] if agent else "task" | |
| def get_default_counters() -> dict: | |
| """Get default agent counters.""" | |
| return { | |
| key: 0 | |
| for key, agent in AGENT_REGISTRY.items() | |
| if agent["has_counter"] | |
| } | |
| def get_registry_for_frontend() -> list: | |
| """Serialize registry metadata for the frontend /api/agents endpoint.""" | |
| return [ | |
| { | |
| "key": key, | |
| "label": agent["label"], | |
| "hasCounter": agent["has_counter"], | |
| "inMenu": agent["in_menu"], | |
| "inLauncher": agent["in_launcher"], | |
| "placeholder": agent["placeholder"], | |
| } | |
| for key, agent in AGENT_REGISTRY.items() | |
| ] | |
| def _build_tools_section() -> str: | |
| """Build the 'available tools' text for the command center system prompt.""" | |
| lines = ["## Available Agents\n\nYou can launch specialized agents for different types of tasks:"] | |
| for key, agent in AGENT_REGISTRY.items(): | |
| if agent["tool"] is not None: | |
| tool_func = agent["tool"]["function"] | |
| capabilities = agent.get("capabilities", "") | |
| lines.append(f"- **{tool_func['name']}**: {tool_func['description']}") | |
| if capabilities: | |
| lines.append(f" {capabilities}") | |
| return "\n".join(lines) | |