"""Simplified Execution Agent Runtime.""" import inspect import json from typing import Dict, Any, List, Optional, Tuple from dataclasses import dataclass from .agent import ExecutionAgent from .tools import get_tool_schemas, get_tool_registry from ...config import get_settings from ...openrouter_client import request_chat_completion from ...logging_config import logger @dataclass class ExecutionResult: """Result from an execution agent.""" agent_name: str success: bool response: str error: Optional[str] = None tools_executed: List[str] = None class ExecutionAgentRuntime: """Manages the execution of a single agent request.""" MAX_TOOL_ITERATIONS = 8 # Initialize execution agent runtime with settings, tools, and agent instance def __init__(self, agent_name: str): settings = get_settings() self.agent = ExecutionAgent(agent_name) self.api_key = settings.api_key self.model = settings.execution_agent_model self.tool_registry = get_tool_registry(agent_name=agent_name) self.tool_schemas = get_tool_schemas() if not self.api_key: raise ValueError("API key not configured. Set API_KEY environment variable.") # Main execution loop for running agent with LLM calls and tool execution async def execute(self, instructions: str) -> ExecutionResult: """Execute the agent with given instructions.""" try: # Build system prompt with history system_prompt = self.agent.build_system_prompt_with_history() # Start conversation with the instruction messages = [{"role": "user", "content": instructions}] tools_executed: List[str] = [] final_response: Optional[str] = None for iteration in range(self.MAX_TOOL_ITERATIONS): logger.info( f"[{self.agent.name}] Requesting plan (iteration {iteration + 1})" ) response = await self._make_llm_call(system_prompt, messages, with_tools=True) assistant_message = response.get("choices", [{}])[0].get("message", {}) if not assistant_message: raise RuntimeError("LLM response did not include an assistant message") raw_tool_calls = assistant_message.get("tool_calls", []) or [] parsed_tool_calls = self._extract_tool_calls(raw_tool_calls) assistant_entry: Dict[str, Any] = { "role": "assistant", "content": assistant_message.get("content", "") or "", } if raw_tool_calls: assistant_entry["tool_calls"] = raw_tool_calls messages.append(assistant_entry) if not parsed_tool_calls: final_response = assistant_entry["content"] or "No action required." break for tool_call in parsed_tool_calls: tool_name = tool_call.get("name", "") tool_args = tool_call.get("arguments", {}) call_id = tool_call.get("id") if not tool_name: logger.warning("Tool call missing name: %s", tool_call) failure = {"error": "Tool call missing name; unable to execute."} tool_message = { "role": "tool", "tool_call_id": call_id or "unknown_tool", "content": self._format_tool_result( tool_name or "", False, failure, tool_args ), } messages.append(tool_message) continue tools_executed.append(tool_name) logger.info(f"[{self.agent.name}] Executing tool: {tool_name}") success, result = await self._execute_tool(tool_name, tool_args) if success: logger.info(f"[{self.agent.name}] Tool {tool_name} completed successfully") record_payload = self._safe_json_dump(result) else: error_detail = result.get("error") if isinstance(result, dict) else str(result) logger.warning(f"[{self.agent.name}] Tool {tool_name} failed: {error_detail}") record_payload = error_detail self.agent.record_tool_execution( tool_name, self._safe_json_dump(tool_args), record_payload ) tool_message = { "role": "tool", "tool_call_id": call_id or tool_name, "content": self._format_tool_result(tool_name, success, result, tool_args), } messages.append(tool_message) else: raise RuntimeError("Reached tool iteration limit without final response") if final_response is None: raise RuntimeError("LLM did not return a final response") self.agent.record_response(final_response) return ExecutionResult( agent_name=self.agent.name, success=True, response=final_response, tools_executed=tools_executed ) except Exception as e: logger.error(f"[{self.agent.name}] Execution failed: {e}") error_msg = str(e) failure_text = f"Failed to complete task: {error_msg}" self.agent.record_response(f"Error: {error_msg}") return ExecutionResult( agent_name=self.agent.name, success=False, response=failure_text, error=error_msg ) # Execute API call with system prompt, messages, and optional tool schemas async def _make_llm_call(self, system_prompt: str, messages: List[Dict], with_tools: bool) -> Dict: """Make an LLM call.""" tools_to_send = self.tool_schemas if with_tools else None logger.info(f"[{self.agent.name}] Calling LLM with model: {self.model}, tools: {len(tools_to_send) if tools_to_send else 0}") return await request_chat_completion( model=self.model, messages=messages, system=system_prompt, api_key=self.api_key, tools=tools_to_send ) # Parse and validate tool calls from LLM response into structured format def _extract_tool_calls(self, raw_tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Extract tool calls from an assistant message.""" tool_calls: List[Dict[str, Any]] = [] for tool in raw_tools: function = tool.get("function", {}) name = function.get("name", "") args = function.get("arguments", "") if isinstance(args, str): try: args = json.loads(args) if args else {} except json.JSONDecodeError: args = {} if name: tool_calls.append({ "id": tool.get("id"), "name": name, "arguments": args, }) return tool_calls # Safely convert objects to JSON with fallback to string representation def _safe_json_dump(self, payload: Any) -> str: """Serialize payload to JSON, falling back to string representation.""" try: return json.dumps(payload, default=str) except TypeError: return str(payload) # Format tool execution results into JSON structure for LLM consumption def _format_tool_result( self, tool_name: str, success: bool, result: Any, arguments: Dict[str, Any], ) -> str: """Build a structured string for tool responses.""" if success: payload: Dict[str, Any] = { "tool": tool_name, "status": "success", "arguments": arguments, "result": result, } else: error_detail = result.get("error") if isinstance(result, dict) else str(result) payload = { "tool": tool_name, "status": "error", "arguments": arguments, "error": error_detail, } return self._safe_json_dump(payload) # Execute tool function from registry with error handling and async support async def _execute_tool(self, tool_name: str, arguments: Dict) -> Tuple[bool, Any]: """Execute a tool. Returns (success, result).""" tool_func = self.tool_registry.get(tool_name) if not tool_func: return False, {"error": f"Unknown tool: {tool_name}"} try: result = tool_func(**arguments) if inspect.isawaitable(result): result = await result return True, result except Exception as e: return False, {"error": str(e)}