""" Tool system for the agent Provides ToolSpec and ToolRouter for managing both built-in and MCP tools """ import logging import warnings from dataclasses import dataclass from typing import Any, Awaitable, Callable, Optional logger = logging.getLogger(__name__) from fastmcp import Client from fastmcp.exceptions import ToolError from lmnr import observe from mcp.types import EmbeddedResource, ImageContent, TextContent from agent.config import MCPServerConfig from agent.tools.dataset_tools import ( HF_INSPECT_DATASET_TOOL_SPEC, hf_inspect_dataset_handler, ) from agent.tools.docs_tools import ( EXPLORE_HF_DOCS_TOOL_SPEC, HF_DOCS_FETCH_TOOL_SPEC, explore_hf_docs_handler, hf_docs_fetch_handler, ) from agent.tools.github_find_examples import ( GITHUB_FIND_EXAMPLES_TOOL_SPEC, github_find_examples_handler, ) from agent.tools.github_list_repos import ( GITHUB_LIST_REPOS_TOOL_SPEC, github_list_repos_handler, ) from agent.tools.github_read_file import ( GITHUB_READ_FILE_TOOL_SPEC, github_read_file_handler, ) from agent.tools.hf_repo_files_tool import ( HF_REPO_FILES_TOOL_SPEC, hf_repo_files_handler, ) from agent.tools.hf_repo_git_tool import ( HF_REPO_GIT_TOOL_SPEC, hf_repo_git_handler, ) from agent.tools.jobs_tool import HF_JOBS_TOOL_SPEC, hf_jobs_handler from agent.tools.plan_tool import PLAN_TOOL_SPEC, plan_tool_handler # NOTE: Private HF repo tool disabled - replaced by hf_repo_files and hf_repo_git # from agent.tools.private_hf_repo_tools import ( # PRIVATE_HF_REPO_TOOL_SPEC, # private_hf_repo_handler, # ) # Suppress aiohttp deprecation warning warnings.filterwarnings( "ignore", category=DeprecationWarning, module="aiohttp.connector" ) NOT_ALLOWED_TOOL_NAMES = ["hf_jobs", "hf_doc_search", "hf_doc_fetch", "hf_whoami"] def convert_mcp_content_to_string(content: list) -> str: """ Convert MCP content blocks to a string format compatible with LLM messages. Based on FastMCP documentation, content can be: - TextContent: has .text field - ImageContent: has .data and .mimeType fields - EmbeddedResource: has .resource field with .text or .blob Args: content: List of MCP content blocks Returns: String representation of the content suitable for LLM consumption """ if not content: return "" parts = [] for item in content: if isinstance(item, TextContent): # Extract text from TextContent blocks parts.append(item.text) elif isinstance(item, ImageContent): # TODO: Handle images # For images, include a description with MIME type parts.append(f"[Image: {item.mimeType}]") elif isinstance(item, EmbeddedResource): # TODO: Handle embedded resources # For embedded resources, try to extract text resource = item.resource if hasattr(resource, "text") and resource.text: parts.append(resource.text) elif hasattr(resource, "blob") and resource.blob: parts.append( f"[Binary data: {resource.mimeType if hasattr(resource, 'mimeType') else 'unknown'}]" ) else: parts.append( f"[Resource: {resource.uri if hasattr(resource, 'uri') else 'unknown'}]" ) else: # Fallback: try to convert to string parts.append(str(item)) return "\n".join(parts) @dataclass class ToolSpec: """Tool specification for LLM""" name: str description: str parameters: dict[str, Any] handler: Optional[Callable[[dict[str, Any]], Awaitable[tuple[str, bool]]]] = None class ToolRouter: """ Routes tool calls to appropriate handlers. Based on codex-rs/core/src/tools/router.rs """ def __init__(self, mcp_servers: dict[str, MCPServerConfig]): self.tools: dict[str, ToolSpec] = {} self.mcp_servers: dict[str, dict[str, Any]] = {} for tool in create_builtin_tools(): self.register_tool(tool) self.mcp_client: Client | None = None if mcp_servers: mcp_servers_payload = {} for name, server in mcp_servers.items(): mcp_servers_payload[name] = server.model_dump() self.mcp_client = Client({"mcpServers": mcp_servers_payload}) self._mcp_initialized = False def register_tool(self, tool: ToolSpec) -> None: self.tools[tool.name] = tool async def register_mcp_tools(self) -> None: tools = await self.mcp_client.list_tools() registered_names = [] skipped_count = 0 for tool in tools: if tool.name in NOT_ALLOWED_TOOL_NAMES: skipped_count += 1 continue registered_names.append(tool.name) self.register_tool( ToolSpec( name=tool.name, description=tool.description, parameters=tool.inputSchema, handler=None, ) ) logger.info( f"Loaded {len(registered_names)} MCP tools: {', '.join(registered_names)} ({skipped_count} disabled)" ) async def register_openapi_tool(self) -> None: """Register the OpenAPI search tool (requires async initialization)""" from agent.tools.docs_tools import ( _get_api_search_tool_spec, search_openapi_handler, ) # Register search_hf_api_endpoints with dynamic spec openapi_spec = await _get_api_search_tool_spec() self.register_tool( ToolSpec( name=openapi_spec["name"], description=openapi_spec["description"], parameters=openapi_spec["parameters"], handler=search_openapi_handler, ) ) logger.info(f"Loaded OpenAPI search tool: {openapi_spec['name']}") def get_tool_specs_for_llm(self) -> list[dict[str, Any]]: """Get tool specifications in OpenAI format""" specs = [] for tool in self.tools.values(): specs.append( { "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.parameters, }, } ) return specs async def __aenter__(self) -> "ToolRouter": if self.mcp_client is not None: await self.mcp_client.__aenter__() await self.mcp_client.initialize() await self.register_mcp_tools() self._mcp_initialized = True # Register OpenAPI tool (requires async initialization) await self.register_openapi_tool() total_tools = len(self.tools) logger.info(f"Agent ready with {total_tools} tools total") return self async def __aexit__(self, exc_type, exc, tb) -> None: if self.mcp_client is not None: await self.mcp_client.__aexit__(exc_type, exc, tb) self._mcp_initialized = False @observe(name="call_tool") async def call_tool( self, tool_name: str, arguments: dict[str, Any], session: Any = None ) -> tuple[str, bool]: """ Call a tool and return (output_string, success_bool). For MCP tools, converts the CallToolResult content blocks to a string. For built-in tools, calls their handler directly. """ # Check if this is a built-in tool with a handler tool = self.tools.get(tool_name) if tool and tool.handler: import inspect # Check if handler accepts session argument sig = inspect.signature(tool.handler) if "session" in sig.parameters: return await tool.handler(arguments, session=session) return await tool.handler(arguments) # Otherwise, use MCP client if self._mcp_initialized: try: result = await self.mcp_client.call_tool(tool_name, arguments) output = convert_mcp_content_to_string(result.content) return output, not result.is_error except ToolError as e: # Catch MCP tool errors and return them to the agent error_msg = f"Tool error: {str(e)}" return error_msg, False return "MCP client not initialized", False # ============================================================================ # BUILT-IN TOOL HANDLERS # ============================================================================ def create_builtin_tools() -> list[ToolSpec]: """Create built-in tool specifications""" # in order of importance tools = [ # Documentation search tools ToolSpec( name=EXPLORE_HF_DOCS_TOOL_SPEC["name"], description=EXPLORE_HF_DOCS_TOOL_SPEC["description"], parameters=EXPLORE_HF_DOCS_TOOL_SPEC["parameters"], handler=explore_hf_docs_handler, ), ToolSpec( name=HF_DOCS_FETCH_TOOL_SPEC["name"], description=HF_DOCS_FETCH_TOOL_SPEC["description"], parameters=HF_DOCS_FETCH_TOOL_SPEC["parameters"], handler=hf_docs_fetch_handler, ), # Dataset inspection tool (unified) ToolSpec( name=HF_INSPECT_DATASET_TOOL_SPEC["name"], description=HF_INSPECT_DATASET_TOOL_SPEC["description"], parameters=HF_INSPECT_DATASET_TOOL_SPEC["parameters"], handler=hf_inspect_dataset_handler, ), # Planning and job management tools ToolSpec( name=PLAN_TOOL_SPEC["name"], description=PLAN_TOOL_SPEC["description"], parameters=PLAN_TOOL_SPEC["parameters"], handler=plan_tool_handler, ), ToolSpec( name=HF_JOBS_TOOL_SPEC["name"], description=HF_JOBS_TOOL_SPEC["description"], parameters=HF_JOBS_TOOL_SPEC["parameters"], handler=hf_jobs_handler, ), # HF Repo management tools ToolSpec( name=HF_REPO_FILES_TOOL_SPEC["name"], description=HF_REPO_FILES_TOOL_SPEC["description"], parameters=HF_REPO_FILES_TOOL_SPEC["parameters"], handler=hf_repo_files_handler, ), ToolSpec( name=HF_REPO_GIT_TOOL_SPEC["name"], description=HF_REPO_GIT_TOOL_SPEC["description"], parameters=HF_REPO_GIT_TOOL_SPEC["parameters"], handler=hf_repo_git_handler, ), ToolSpec( name=GITHUB_FIND_EXAMPLES_TOOL_SPEC["name"], description=GITHUB_FIND_EXAMPLES_TOOL_SPEC["description"], parameters=GITHUB_FIND_EXAMPLES_TOOL_SPEC["parameters"], handler=github_find_examples_handler, ), ToolSpec( name=GITHUB_LIST_REPOS_TOOL_SPEC["name"], description=GITHUB_LIST_REPOS_TOOL_SPEC["description"], parameters=GITHUB_LIST_REPOS_TOOL_SPEC["parameters"], handler=github_list_repos_handler, ), ToolSpec( name=GITHUB_READ_FILE_TOOL_SPEC["name"], description=GITHUB_READ_FILE_TOOL_SPEC["description"], parameters=GITHUB_READ_FILE_TOOL_SPEC["parameters"], handler=github_read_file_handler, ), ] tool_names = ", ".join([t.name for t in tools]) logger.info(f"Loaded {len(tools)} built-in tools: {tool_names}") return tools