agent-ui / backend /command.py
lvwerra's picture
lvwerra HF Staff
Refactor backend: extract async stream helper, direct tool registry, nudge_for_result via call_llm
4e3db9c
"""
Command center backend - handles tool-based agent launching and direct tools
"""
import json
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
# Tool definitions derived from agent registry
from .agents import get_tools, get_agent_type_map, get_tool_arg
from .tools import DIRECT_TOOL_REGISTRY
# Combine agent-launch tools with direct tools
TOOLS = get_tools() + [t["schema"] for t in DIRECT_TOOL_REGISTRY.values()]
MAX_TURNS = 10 # Limit conversation turns in command center
def stream_command_center(client, model: str, messages: List[Dict], extra_params: dict = None, abort_event=None, files_root: str = None):
"""
Stream command center responses with agent launching capabilities
Yields:
dict: Updates with type 'thinking', 'launch', 'done', or 'error'
"""
from .agents import call_llm
turns = 0
done = False
debug_call_number = 0
while not done and turns < MAX_TURNS:
# Check abort before each turn
if abort_event and abort_event.is_set():
yield {"type": "aborted"}
return
turns += 1
# LLM call with retries and debug events
response = None
for event in call_llm(client, model, messages, tools=TOOLS, extra_params=extra_params, abort_event=abort_event, call_number=debug_call_number):
if "_response" in event:
response = event["_response"]
debug_call_number = event["_call_number"]
else:
yield event
if event.get("type") in ("error", "aborted"):
return
if response is None:
return
# Get response
assistant_message = response.choices[0].message
content = assistant_message.content or ""
tool_calls = assistant_message.tool_calls or []
# Send thinking content if present
if content.strip():
yield {"type": "thinking", "content": content}
# Handle tool calls (agent launches + direct tools)
if tool_calls:
has_launches = False
for tool_call in tool_calls:
# Check abort between tool calls
if abort_event and abort_event.is_set():
yield {"type": "aborted"}
return
function_name = tool_call.function.name
# Parse arguments
try:
args = json.loads(tool_call.function.arguments)
except:
yield {"type": "error", "content": "Failed to parse tool arguments"}
return
# --- Direct tools (execute synchronously) ---
if function_name in DIRECT_TOOL_REGISTRY:
# Emit tool_start for frontend
yield {
"type": "tool_start",
"tool": function_name,
"args": args,
"tool_call_id": tool_call.id,
"arguments": tool_call.function.arguments,
"thinking": content,
}
# Execute the tool via registry
tool_entry = DIRECT_TOOL_REGISTRY[function_name]
result = tool_entry["execute"](args, {"files_root": files_root})
# Emit tool_result for frontend
yield {
"type": "tool_result",
"tool": function_name,
"tool_call_id": tool_call.id,
"result": result,
"response": result.get("content", ""),
}
# Add to message history so LLM can continue
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": function_name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result.get("content", ""),
})
continue
# --- Agent launch tools ---
agent_type_map = get_agent_type_map()
agent_type = agent_type_map.get(function_name)
if agent_type:
has_launches = True
# Get the initial message using the registered arg name for this type
initial_message = args.get(get_tool_arg(agent_type)) or args.get("task") or args.get("message")
task_id = args.get("task_id", "")
# Send launch action to frontend
yield {
"type": "launch",
"agent_type": agent_type,
"initial_message": initial_message,
"task_id": task_id,
"tool_call_id": tool_call.id
}
# Add tool call to message history for context
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": f"Launched {agent_type} agent with task: {initial_message}"
})
else:
yield {"type": "error", "content": f"Unknown tool: {function_name}"}
return
# If any agent launches happened, stop and let agents run
# If only direct tools, continue the loop so LLM can respond
if has_launches:
done = True
else:
# No tool calls - conversation complete
messages.append({"role": "assistant", "content": content})
done = True
# Send done signal
yield {"type": "done"}