""" Command center backend - handles tool-based agent launching and direct tools """ import json import logging from typing import List, Dict logger = logging.getLogger(__name__) # Tool definitions derived from agent registry from .agents import get_tools, get_agent_type_map, get_tool_arg from .tools import DIRECT_TOOL_REGISTRY # Combine agent-launch tools with direct tools TOOLS = get_tools() + [t["schema"] for t in DIRECT_TOOL_REGISTRY.values()] MAX_TURNS = 10 # Limit conversation turns in command center def stream_command_center(client, model: str, messages: List[Dict], extra_params: dict = None, abort_event=None, files_root: str = None): """ Stream command center responses with agent launching capabilities Yields: dict: Updates with type 'thinking', 'launch', 'done', or 'error' """ from .agents import call_llm turns = 0 done = False debug_call_number = 0 while not done and turns < MAX_TURNS: # Check abort before each turn if abort_event and abort_event.is_set(): yield {"type": "aborted"} return turns += 1 # LLM call with retries and debug events response = None for event in call_llm(client, model, messages, tools=TOOLS, extra_params=extra_params, abort_event=abort_event, call_number=debug_call_number): if "_response" in event: response = event["_response"] debug_call_number = event["_call_number"] else: yield event if event.get("type") in ("error", "aborted"): return if response is None: return # Get response assistant_message = response.choices[0].message content = assistant_message.content or "" tool_calls = assistant_message.tool_calls or [] # Send thinking content if present if content.strip(): yield {"type": "thinking", "content": content} # Handle tool calls (agent launches + direct tools) if tool_calls: has_launches = False for tool_call in tool_calls: # Check abort between tool calls if abort_event and abort_event.is_set(): yield {"type": "aborted"} return function_name = tool_call.function.name # Parse arguments try: args = json.loads(tool_call.function.arguments) except: yield {"type": "error", "content": "Failed to parse tool arguments"} return # --- Direct tools (execute synchronously) --- if function_name in DIRECT_TOOL_REGISTRY: # Emit tool_start for frontend yield { "type": "tool_start", "tool": function_name, "args": args, "tool_call_id": tool_call.id, "arguments": tool_call.function.arguments, "thinking": content, } # Execute the tool via registry tool_entry = DIRECT_TOOL_REGISTRY[function_name] result = tool_entry["execute"](args, {"files_root": files_root}) # Emit tool_result for frontend yield { "type": "tool_result", "tool": function_name, "tool_call_id": tool_call.id, "result": result, "response": result.get("content", ""), } # Add to message history so LLM can continue messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": function_name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result.get("content", ""), }) continue # --- Agent launch tools --- agent_type_map = get_agent_type_map() agent_type = agent_type_map.get(function_name) if agent_type: has_launches = True # Get the initial message using the registered arg name for this type initial_message = args.get(get_tool_arg(agent_type)) or args.get("task") or args.get("message") task_id = args.get("task_id", "") # Send launch action to frontend yield { "type": "launch", "agent_type": agent_type, "initial_message": initial_message, "task_id": task_id, "tool_call_id": tool_call.id } # Add tool call to message history for context messages.append({ "role": "assistant", "content": content, "tool_calls": [{ "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, } }] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": f"Launched {agent_type} agent with task: {initial_message}" }) else: yield {"type": "error", "content": f"Unknown tool: {function_name}"} return # If any agent launches happened, stop and let agents run # If only direct tools, continue the loop so LLM can respond if has_launches: done = True else: # No tool calls - conversation complete messages.append({"role": "assistant", "content": content}) done = True # Send done signal yield {"type": "done"}