| import json
|
| import datetime
|
| from loguru import logger
|
| from typing import (
|
| Dict,
|
| Any,
|
| List,
|
| Literal,
|
| Union,
|
| AsyncIterator,
|
| )
|
|
|
| from .types import ToolCallObject
|
| from .mcp_client import MCPClient
|
| from .tool_manager import ToolManager
|
|
|
|
|
| class ToolExecutor:
|
| def __init__(
|
| self,
|
| mcp_client: MCPClient,
|
| tool_manager: ToolManager,
|
| ):
|
| self._mcp_client = mcp_client
|
| self._tool_manager = tool_manager
|
|
|
| def parse_tool_call(self, call: Union[Dict[str, Any], ToolCallObject]) -> tuple:
|
| """Parse tool call from different formats.
|
|
|
| Returns:
|
| tuple: (tool_name, tool_id, tool_input, is_error, result_content, parse_error)
|
| """
|
| tool_name: str = ""
|
| tool_id: str = ""
|
| tool_input: Any = None
|
| is_error: bool = False
|
| result_content: str | dict = ""
|
| parse_error: bool = False
|
|
|
| if isinstance(call, ToolCallObject):
|
| tool_name = call.function.name
|
| tool_id = call.id
|
| try:
|
| tool_input = json.loads(call.function.arguments)
|
| except json.JSONDecodeError:
|
| logger.error(
|
| f"Failed to decode OpenAI tool arguments for '{tool_name}'"
|
| )
|
| result_content = (
|
| f"Error: Invalid arguments format for tool '{tool_name}'."
|
| )
|
| is_error = True
|
| parse_error = True
|
| elif isinstance(call, dict):
|
| tool_id = call.get("id")
|
| tool_name = call.get("name")
|
| tool_input = call.get("input", call.get("args"))
|
|
|
| if tool_input is None:
|
| logger.warning(
|
| f"Empty input for tool '{tool_name}' (ID: {tool_id}). Using empty object."
|
| )
|
| tool_input = {}
|
|
|
| if not tool_id or not tool_name:
|
| logger.error(f"Invalid Dict tool call structure: {call}")
|
| result_content = "Error: Invalid tool call structure from LLM."
|
| is_error = True
|
| parse_error = True
|
| else:
|
| logger.error(f"Unsupported tool call type: {type(call)}")
|
| result_content = "Error: Unsupported tool call type."
|
| is_error = True
|
| parse_error = True
|
|
|
| return tool_name, tool_id, tool_input, is_error, result_content, parse_error
|
|
|
| def format_tool_result(
|
| self,
|
| caller_mode: Literal["Claude", "OpenAI", "Prompt"],
|
| tool_id: str,
|
| result_content: str,
|
| is_error: bool,
|
| ) -> Dict[str, Any] | None:
|
| """Format tool result for LLM API."""
|
| if caller_mode == "Claude":
|
|
|
|
|
| if isinstance(result_content, list):
|
|
|
| content_to_send = result_content
|
| elif isinstance(result_content, str) and result_content:
|
|
|
| content_to_send = result_content
|
| elif not result_content and is_error:
|
|
|
| content_to_send = "Error occurred during tool execution."
|
| else:
|
|
|
| content_to_send = ""
|
|
|
| return {
|
| "type": "tool_result",
|
| "tool_use_id": tool_id,
|
| "content": content_to_send,
|
| "is_error": is_error,
|
| }
|
| elif caller_mode == "OpenAI":
|
|
|
| return {
|
| "role": "tool",
|
| "tool_call_id": tool_id,
|
| "content": str(result_content),
|
| }
|
| elif caller_mode == "Prompt":
|
|
|
| return {
|
| "tool_id": tool_id,
|
| "content": str(result_content),
|
| "is_error": is_error,
|
| }
|
| return None
|
|
|
| def process_tool_from_prompt_json(
|
| self, data: List[Dict[str, Any]]
|
| ) -> List[Dict[str, Any]]:
|
| """Process tool data from JSON in prompt mode."""
|
| parsed_tools = []
|
| for item in data:
|
| server = item.get("mcp_server")
|
| tool_name = item.get("tool")
|
| arguments_str = item.get("arguments")
|
| if all([server, tool_name, arguments_str]):
|
| try:
|
| args_dict = json.loads(arguments_str)
|
| parsed_tools.append(
|
| {
|
| "name": tool_name,
|
| "server": server,
|
| "args": args_dict,
|
| "id": f"prompt_tool_{len(parsed_tools)}",
|
| }
|
| )
|
| logger.info(f"Parsed tool call from prompt JSON: {tool_name}")
|
| except json.JSONDecodeError:
|
| logger.error(
|
| "Failed to decode arguments JSON in prompt mode tool call"
|
| )
|
| except Exception as e:
|
| logger.error(f"Error processing prompt mode tool dict: {e}")
|
| else:
|
| logger.warning("Skipping invalid tool structure in prompt mode JSON")
|
| return parsed_tools
|
|
|
| async def execute_tools(
|
| self,
|
| tool_calls: Union[List[Dict[str, Any]], List[ToolCallObject]],
|
| caller_mode: Literal["Claude", "OpenAI", "Prompt"],
|
| ) -> AsyncIterator[Dict[str, Any]]:
|
| """Execute tools and yield status updates."""
|
| tool_results_for_llm = []
|
|
|
| logger.info(f"Executing {len(tool_calls)} tool(s) for {caller_mode} caller.")
|
| for call in tool_calls:
|
| (
|
| tool_name,
|
| tool_id,
|
| tool_input,
|
| is_error,
|
| result_content,
|
| parse_error,
|
| ) = self.parse_tool_call(call)
|
|
|
| logger.info(f"Executing tool: {call}")
|
|
|
| if parse_error:
|
| logger.warning(
|
| f"Skipping tool call due to parsing error: {result_content}"
|
| )
|
| status_update = {
|
| "type": "tool_call_status",
|
| "tool_id": tool_id
|
| or f"parse_error_{datetime.datetime.now(datetime.timezone.utc).isoformat()}",
|
| "tool_name": tool_name or "Unknown Tool",
|
| "status": "error",
|
| "content": result_content,
|
| "timestamp": datetime.datetime.now(
|
| datetime.timezone.utc
|
| ).isoformat()
|
| + "Z",
|
| }
|
| yield status_update
|
|
|
|
|
| formatted_result = self.format_tool_result(
|
| caller_mode,
|
| tool_id
|
| or f"parse_error_{datetime.datetime.now(datetime.timezone.utc).isoformat()}",
|
| result_content,
|
| True,
|
| )
|
| if formatted_result:
|
| tool_results_for_llm.append(formatted_result)
|
| continue
|
|
|
|
|
| yield {
|
| "type": "tool_call_status",
|
| "tool_id": tool_id,
|
| "tool_name": tool_name,
|
| "status": "running",
|
| "content": f"Input: {json.dumps(tool_input)}",
|
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat()
|
| + "Z",
|
| }
|
|
|
|
|
| (
|
| is_error,
|
| text_content,
|
| metadata,
|
| content_items,
|
| ) = await self.run_single_tool(tool_name, tool_id, tool_input)
|
|
|
|
|
| status_content = text_content
|
| llm_formatted_content = text_content
|
|
|
| if content_items:
|
| image_items = [
|
| item for item in content_items if item.get("type") == "image"
|
| ]
|
| if image_items:
|
| num_images = len(image_items)
|
| status_content = (
|
| f"{text_content}\n[Tool returned {num_images} image(s)]".strip()
|
| )
|
|
|
| if caller_mode == "Claude":
|
|
|
| claude_blocks = []
|
| if text_content:
|
| claude_blocks.append({"type": "text", "text": text_content})
|
| for item in content_items:
|
| if (
|
| item.get("type") == "image"
|
| and "data" in item
|
| and "mimeType" in item
|
| ):
|
| claude_blocks.append(
|
| {
|
| "type": "image",
|
| "source": {
|
| "type": "base64",
|
| "media_type": item["mimeType"],
|
| "data": item["data"],
|
| },
|
| }
|
| )
|
|
|
| llm_formatted_content = (
|
| claude_blocks if claude_blocks else ""
|
| )
|
| elif caller_mode in ["OpenAI", "Prompt"]:
|
| llm_formatted_content = status_content
|
|
|
|
|
| status_update = {
|
| "type": "tool_call_status",
|
| "tool_id": tool_id,
|
| "tool_name": tool_name,
|
| "status": "error" if is_error else "completed",
|
| "content": status_content
|
| if not is_error
|
| else f"Error: {text_content}",
|
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat()
|
| + "Z",
|
| }
|
|
|
|
|
| if tool_name == "stagehand_navigate" and not is_error:
|
| live_view_data = metadata.get("liveViewData", {})
|
| if live_view_data:
|
| logger.info(
|
| f"Found live view data for stagehand_navigate: {live_view_data}"
|
| )
|
| status_update["browser_view"] = live_view_data
|
|
|
| yield status_update
|
|
|
|
|
| formatted_result = self.format_tool_result(
|
| caller_mode, tool_id, llm_formatted_content, is_error
|
| )
|
| if formatted_result:
|
| tool_results_for_llm.append(formatted_result)
|
|
|
| logger.info(
|
| f"Finished executing tools with {len(tool_results_for_llm)} results."
|
| )
|
| yield {"type": "final_tool_results", "results": tool_results_for_llm}
|
|
|
| async def run_single_tool(
|
| self, tool_name: str, tool_id: str, tool_input: Any
|
| ) -> tuple[bool, str, Dict[str, Any], List[Dict[str, Any]]]:
|
| """Run a single tool using MCPClient.
|
|
|
| Returns:
|
| tuple: (is_error, text_content, metadata, content_items)
|
| """
|
| logger.info(f"Executing tool: {tool_name} (ID: {tool_id})")
|
| tool_info = self._tool_manager.get_tool(tool_name)
|
|
|
| is_error = False
|
| text_content = ""
|
| metadata = {}
|
| content_items = []
|
|
|
| if tool_input is None:
|
| tool_input = {}
|
|
|
| if not tool_info:
|
| logger.error(f"Tool '{tool_name}' not found in ToolManager.")
|
| text_content = f"Error: Tool '{tool_name}' is not available."
|
| content_items = [{"type": "error", "text": text_content}]
|
| is_error = True
|
| elif not tool_info.related_server:
|
| logger.error(f"Tool '{tool_name}' does not have a related server defined.")
|
| text_content = f"Error: Configuration error for tool '{tool_name}'. No server specified."
|
| content_items = [{"type": "error", "text": text_content}]
|
| is_error = True
|
| else:
|
| try:
|
| result_dict = await self._mcp_client.call_tool(
|
| server_name=tool_info.related_server,
|
| tool_name=tool_name,
|
| tool_args=tool_input,
|
| )
|
|
|
| metadata = result_dict.get("metadata", {})
|
| content_items = result_dict.get("content_items", [])
|
|
|
|
|
| if content_items and content_items[0].get("type") == "error":
|
| is_error = True
|
| text_content = content_items[0].get(
|
| "text", "Unknown error from tool execution."
|
| )
|
| elif content_items and content_items[0].get("type") == "text":
|
| text_content = content_items[0].get("text", "")
|
|
|
|
|
| if not is_error:
|
| logger.info(f"Tool '{tool_name}' executed successfully.")
|
| if content_items:
|
| logger.info(f"Content items from tool '{tool_name}':")
|
| for item in content_items:
|
| item_type = item.get("type", "unknown")
|
| logger.info(f" Type: {item_type}")
|
| for key, value in item.items():
|
| if (
|
| key != "type" and key != "data"
|
| ):
|
| log_value = (
|
| f"(length: {len(value)})"
|
| if isinstance(value, str) and len(value) > 100
|
| else value
|
| )
|
| logger.info(f" {key}: {log_value}")
|
|
|
| except (ValueError, RuntimeError, ConnectionError) as e:
|
| logger.exception(f"Error executing tool '{tool_name}': {e}")
|
| text_content = f"Error executing tool '{tool_name}': {e}"
|
| content_items = [{"type": "error", "text": text_content}]
|
| is_error = True
|
| except Exception as e:
|
| logger.exception(f"Unexpected error executing tool '{tool_name}': {e}")
|
| text_content = f"Unexpected error executing tool '{tool_name}': {e}"
|
| content_items = [{"type": "error", "text": text_content}]
|
| is_error = True
|
|
|
| return is_error, text_content, metadata, content_items
|
|
|