wangrongsheng's picture
Upload folder using huggingface_hub
8c6097b verified
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
#!/usr/bin/env python3
"""
MCP Client for Agent-to-Server Communication
Provides a proper MCP client that uses the official MCP package
to connect to and communicate with MCP servers through the Model Context Protocol.
"""
import json
import logging
import time
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).parent.parent.parent))
from ..utils.status_codes import JsonRpcErr
from http import HTTPStatus
try:
import httpx
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
logging.warning("HTTP client dependencies not available. Falling back to direct tools.")
logger = logging.getLogger(__name__)
@dataclass
class MCPClientResult:
"""Standard result format for MCP client operations"""
success: bool
data: Any = None
error: str = None
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"success": self.success,
"data": self.data,
"error": self.error,
"metadata": self.metadata
}
@dataclass
class MCPTool:
"""Simple representation of an MCP tool"""
name: str
description: str = ""
input_schema: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RetryConfig:
"""Configuration for retry behavior on rate limiting"""
max_retries: int = 20 # Maximum number of retry attempts
base_delay: float = 2.0 # Base delay between retries (seconds)
max_delay: float = 60.0 # Maximum delay between retries (seconds)
exponential_backoff: bool = True # Use exponential backoff
respect_retry_after: bool = True # Respect server's Retry-After header
retry_on_rate_limit: bool = True # Enable automatic retry on rate limits
class MCPClient:
"""
Simple HTTP-based MCP Client for dynamic tool discovery and execution.
This client makes direct HTTP JSON-RPC calls to the MCP server,
avoiding the complexity of streaming connections.
Session management is handled entirely by the server:
- Server assigns session IDs on connection
- Server manages workspace creation and isolation
- All tool operations use server-managed workspaces
"""
def __init__(self, server_url: str = "http://localhost:6274/mcp", retry_config: Optional[RetryConfig] = None):
self.server_url = server_url.rstrip('/')
self.retry_config = retry_config or RetryConfig()
self._tools: Dict[str, MCPTool] = {}
self._connected = False
self._request_id = 0
self._session_id = None
if not MCP_AVAILABLE:
logger.warning("HTTP client not available. Some functionality may be limited.")
return
# Initialize connection and discover tools
self._initialize_connection()
def _get_next_id(self) -> int:
"""Get next request ID"""
self._request_id += 1
return self._request_id
@staticmethod
def _parse_sse_response(sse_text: str) -> Dict[str, Any]:
"""Parse Server-Sent Events response and extract JSON data"""
try:
# SSE format: "event: message\ndata: {json}\n\n"
lines = sse_text.strip().split('\n')
for line in lines:
if line.startswith('data: '):
json_data = line[6:] # Remove "data: " prefix
return json.loads(json_data)
# If no data line found, try parsing entire response as JSON
return json.loads(sse_text)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse SSE response: {e}")
logger.error(f"SSE text: {sse_text[:200]}...")
return {"error": {"code": JsonRpcErr.PARSE_ERROR, "message": f"Parse error: {e}"}}
def _make_request(self, method: str, params: Dict[str, Any] = None) -> MCPClientResult:
"""Make a JSON-RPC request to the MCP server with automatic retry on rate limits"""
if not MCP_AVAILABLE:
return MCPClientResult(success=False, error="HTTP client not available")
# Prepare JSON-RPC request
request_data = {
"jsonrpc": "2.0",
"id": self._get_next_id(),
"method": method,
"params": params or {}
}
# Make HTTP request with proper MCP headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
# Add session ID if available
if self._session_id:
headers["X-Session-ID"] = self._session_id
last_error = None
retry_count = 0
while retry_count <= self.retry_config.max_retries:
try:
# Disable proxy for localhost/127.0.0.1 connections to avoid proxy interference
import os
from urllib.parse import urlparse
parsed_url = urlparse(self.server_url)
is_localhost = parsed_url.hostname in ['localhost', '127.0.0.1', '::1']
# Add localhost to NO_PROXY for localhost connections
original_no_proxy = None
if is_localhost:
original_no_proxy = os.environ.get('NO_PROXY', os.environ.get('no_proxy', ''))
# Add localhost and 127.0.0.1 to NO_PROXY
no_proxy_hosts = ['localhost', '127.0.0.1', '::1']
if original_no_proxy:
existing_hosts = [h.strip() for h in original_no_proxy.split(',')]
no_proxy_hosts.extend(existing_hosts)
os.environ['NO_PROXY'] = ','.join(no_proxy_hosts)
os.environ['no_proxy'] = ','.join(no_proxy_hosts)
try:
# Create client with connection pooling for high-concurrency
limits = httpx.Limits(
max_keepalive_connections=3000, # Keep more connections alive
max_connections=3000, # Allow more concurrent connections
keepalive_expiry=1000.0 # Keep connections alive longer
)
timeout = httpx.Timeout(
connect=100.0,
read=None,
write=60.0,
pool=30.0
)
with httpx.Client(
timeout=timeout, # Higher timeout for high-concurrency scenarios
limits=limits, # Connection pooling for better performance
trust_env=False,
http2=True # Enable HTTP/2 for better multiplexing
) as client:
response = client.post(
self.server_url,
json=request_data,
headers=headers
)
# Check for rate limiting (HTTP 429)
if response.status_code == 429:
if not self.retry_config.retry_on_rate_limit:
return MCPClientResult(
success=False,
error=f"Rate limit exceeded (HTTP 429) - retries disabled",
metadata={"status_code": 429, "retry_count": retry_count}
)
if retry_count >= self.retry_config.max_retries:
return MCPClientResult(
success=False,
error=f"Rate limit exceeded (HTTP 429) - max retries ({self.retry_config.max_retries}) reached",
metadata={"status_code": 429, "retry_count": retry_count}
)
# Calculate retry delay
delay = self._calculate_retry_delay(response, retry_count)
logger.warning(f"Rate limit exceeded for {method} (attempt {retry_count + 1}/{self.retry_config.max_retries + 1}). Retrying in {delay:.1f}s...")
# Wait before retry
time.sleep(delay)
retry_count += 1
continue
# Handle other HTTP errors
if response.status_code != HTTPStatus.OK:
return MCPClientResult(
success=False,
error=f"HTTP {response.status_code}: {response.text}",
metadata={"status_code": response.status_code, "retry_count": retry_count}
)
# Parse successful response (could be JSON or SSE format)
if response.headers.get("content-type", "").startswith("text/event-stream"):
# Parse SSE format
response_data = self._parse_sse_response(response.text)
else:
# Parse regular JSON
response_data = response.json()
if "error" in response_data:
return MCPClientResult(
success=False,
error=f"MCP Error: {response_data['error']}",
metadata={"retry_count": retry_count}
)
# Capture session ID from response data (for all methods, not just initialize)
if "session_id" in response_data:
self._session_id = response_data["session_id"]
logger.info(f"Captured session ID from response: {self._session_id}")
# Success! Log retry info if this wasn't the first attempt
if retry_count > 0:
logger.info(f"Request {method} succeeded after {retry_count} retries")
return MCPClientResult(
success=True,
data=response_data.get("result"),
metadata={
"method": method,
"server_url": self.server_url,
"session_id": self._session_id,
"retry_count": retry_count
}
)
finally:
# Restore original NO_PROXY environment variable
if is_localhost:
if original_no_proxy is not None:
if original_no_proxy:
os.environ['NO_PROXY'] = original_no_proxy
os.environ['no_proxy'] = original_no_proxy
else:
# Remove NO_PROXY if it wasn't set originally
os.environ.pop('NO_PROXY', None)
os.environ.pop('no_proxy', None)
except Exception as e:
last_error = str(e)
logger.error(f"MCP request failed for {method} (attempt {retry_count + 1}): {e}")
# Only retry on certain exceptions (network issues, timeouts)
if not self._should_retry_exception(e) or retry_count >= self.retry_config.max_retries:
break
# Calculate retry delay for exceptions
delay = self._calculate_exception_retry_delay(retry_count)
logger.warning(f"Request {method} failed, retrying in {delay:.1f}s... (attempt {retry_count + 1}/{self.retry_config.max_retries + 1})")
time.sleep(delay)
retry_count += 1
# All retries exhausted
return MCPClientResult(
success=False,
error=f"Request failed after {retry_count} retries. Last error: {last_error}",
metadata={"retry_count": retry_count}
)
def _calculate_retry_delay(self, response, retry_count: int) -> float:
"""Calculate delay before retry based on server response and retry count"""
delay = self.retry_config.base_delay
# Respect server's Retry-After header if available
if self.retry_config.respect_retry_after and "Retry-After" in response.headers:
try:
retry_after = float(response.headers["Retry-After"])
delay = min(retry_after, self.retry_config.max_delay)
logger.debug("Using server Retry-After: {%s}s", delay)
except (ValueError, TypeError):
logger.warning(f"Invalid Retry-After header: {response.headers.get('Retry-After')}")
# Apply exponential backoff if enabled
elif self.retry_config.exponential_backoff:
delay = min(
self.retry_config.base_delay * (2 ** retry_count),
self.retry_config.max_delay
)
return delay
def _calculate_exception_retry_delay(self, retry_count: int) -> float:
"""Calculate delay for exception-based retries"""
if self.retry_config.exponential_backoff:
return min(
self.retry_config.base_delay * (2 ** retry_count),
self.retry_config.max_delay
)
return self.retry_config.base_delay
@staticmethod
def _should_retry_exception(exception: Exception) -> bool:
"""Determine if an exception warrants a retry"""
# Retry on network-related exceptions
if isinstance(exception, (httpx.RequestError, httpx.TimeoutException, httpx.ConnectError)):
return True
# Don't retry on other exceptions (parsing errors, etc.)
return False
def _initialize_connection(self):
"""Initialize MCP client connection and fetch available tools"""
if not MCP_AVAILABLE:
return
try:
# Initialize session
init_result = self._make_request("initialize", {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {
"name": "DeepDiver-MCP-Client",
"version": "1.0.0"
}
})
print(init_result)
if not init_result.success:
logger.error(f"MCP initialization failed: {init_result.error}")
return
logger.info("MCP client initialized successfully")
# Fetch available tools
tools_result = self._make_request("tools/list")
if tools_result.success and tools_result.data:
tools_data = tools_result.data.get("tools", [])
self._tools = {}
for tool_data in tools_data:
tool = MCPTool(
name=tool_data.get("name", ""),
description=tool_data.get("description", ""),
input_schema=tool_data.get("inputSchema", {})
)
self._tools[tool.name] = tool
logger.info(f"Discovered {len(self._tools)} tools from MCP server: {list(self._tools.keys())}")
self._connected = True
except Exception as e:
logger.error(f"Failed to initialize MCP client: {e}")
self._connected = False
def _ensure_connection(self):
"""Ensure MCP client is connected"""
if not MCP_AVAILABLE:
raise RuntimeError("HTTP client not available")
if not self._connected:
self._initialize_connection()
if not self._connected:
raise RuntimeError("MCP client not connected to server")
def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> MCPClientResult:
"""
Generic method to call any tool available on the MCP server.
Args:
tool_name: Name of the tool to call
arguments: Dictionary of arguments to pass to the tool
Returns:
MCPClientResult with the tool execution result
"""
try:
self._ensure_connection()
if tool_name not in self._tools:
return MCPClientResult(
success=False,
error=f"Tool '{tool_name}' not available on server. Available tools: {list(self._tools.keys())}"
)
# Call the tool via JSON-RPC
result = self._make_request("tools/call", {
"name": tool_name,
"arguments": arguments
})
return result
except Exception as e:
logger.error(f"Error calling tool '{tool_name}': {e}")
return MCPClientResult(
success=False,
error=str(e)
)
def get_available_tools(self) -> Dict[str, MCPTool]:
"""Get dictionary of available tools from the server"""
return self._tools.copy()
def list_tools(self) -> List[str]:
"""Get list of available tool names"""
return list(self._tools.keys())
def get_tool_info(self, tool_name: str) -> Optional[MCPTool]:
"""Get detailed information about a specific tool"""
return self._tools.get(tool_name)
def is_connected(self) -> bool:
"""Check if client is connected to MCP server"""
return self._connected and MCP_AVAILABLE
def refresh_tools(self):
"""Refresh the list of available tools from the server"""
try:
# Fetch available tools
tools_result = self._make_request("tools/list")
if tools_result.success and tools_result.data:
tools_data = tools_result.data.get("tools", [])
self._tools = {}
print(self._tools)
for tool_data in tools_data:
tool = MCPTool(
name=tool_data.get("name", ""),
description=tool_data.get("description", ""),
input_schema=tool_data.get("inputSchema", {})
)
self._tools[tool.name] = tool
logger.info(f"Refreshed {len(self._tools)} tools from MCP server")
else:
logger.error(f"Failed to refresh tools: {tools_result.error}")
except Exception as e:
logger.error(f"Error refreshing tools: {e}")
def close(self):
"""Close MCP client connection"""
# Since we create connections per request, just mark as disconnected
self._connected = False
class MCPToolsAdapter:
"""
Adapter class that provides the MCPTools interface while using the generic MCP client.
This adapter provides backward compatibility with existing agents by mapping
MCPTools method calls to generic MCP client tool calls.
"""
def __init__(self, server_url: str = "http://localhost:6274/mcp", retry_config: Optional[RetryConfig] = None):
self.client = MCPClient(server_url, retry_config)
def _call_tool(self, tool_name: str, **kwargs) -> MCPClientResult:
"""Internal method to call tools through the MCP client"""
return self.client.call_tool(tool_name, kwargs)
def __getattr__(self, name: str):
"""
Dynamic method creation for any tool available on the server.
This allows calling tools like adapter.batch_web_search(...) or adapter.file_read(...)
"""
if name.startswith('_'):
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
# Create a dynamic method that calls the tool
def tool_method(**kwargs):
result = self._call_tool(name, **kwargs)
# For backward compatibility, return the data portion
return result.data if result.success else {"error": result.error}
return tool_method
def is_connected(self) -> bool:
"""Check if the MCP client is connected to the server."""
return self.client.is_connected()
def get_available_tools(self) -> Dict[str, MCPTool]:
"""Get available tools from the MCP server."""
return self.client.get_available_tools()
def list_tools(self) -> List[str]:
"""Get list of available tool names."""
return self.client.list_tools()
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""
Get tool schemas for all available tools.
This is the proper MCP way - schemas come from server, not direct imports.
"""
schemas = []
available_tools = self.get_available_tools()
for tool_name, tool_info in available_tools.items():
schema = {
"type": "function",
"function": {
"name": tool_name,
"description": tool_info.description,
"parameters": tool_info.input_schema
}
}
schemas.append(schema)
return schemas
def refresh_tools(self):
"""Refresh the list of available tools from the server."""
self.client.refresh_tools()
def get_session_info(self) -> Optional[Dict[str, Any]]:
"""Get session information from the underlying MCP client."""
try:
if hasattr(self.client, '_session_id'):
return {
"session_id": self.client._session_id,
"connected": self.client.is_connected(),
"server_url": getattr(self.client, 'server_url', 'unknown')
}
return None
except Exception:
return None
def close(self):
"""Close the MCP client connection."""
self.client.close()
class FilteredMCPToolsAdapter:
"""
Filtered adapter that shares MCP client connection but restricts tool access per agent type.
This allows agents to:
- Share the same session/workspace (via shared client)
- Have different tool sets appropriate for their role
- Maintain proper separation of concerns
"""
def __init__(self, shared_client: MCPClient, allowed_tools: List[str]):
"""
Initialize with shared client and allowed tools list
Args:
shared_client: Shared MCPClient instance (same session)
allowed_tools: List of tools this agent can access
"""
self.client = shared_client
self.allowed_tools = set(allowed_tools)
# Validate that allowed tools exist on server
available_tools = set(self.client.list_tools())
invalid_tools = self.allowed_tools - available_tools
if invalid_tools:
logger.warning(f"Requested tools not available on server: {invalid_tools}")
self.allowed_tools = self.allowed_tools & available_tools
def _call_tool(self, tool_name: str, **kwargs) -> MCPClientResult:
"""Call tool if allowed, otherwise return error"""
if tool_name not in self.allowed_tools:
return MCPClientResult(
success=False,
error=f"Tool '{tool_name}' not allowed for this agent. Allowed tools: {list(self.allowed_tools)}"
)
# Remove any workspace_path if accidentally passed - server handles workspace
kwargs.pop('workspace_path', None)
return self.client.call_tool(tool_name, kwargs)
def __getattr__(self, name: str):
"""
Dynamic method resolution with tool filtering.
Only allows access to tools in the allowed_tools list.
"""
if name in self.allowed_tools:
def tool_method(**kwargs):
return self._call_tool(name, **kwargs)
return tool_method
if name in self.client.list_tools():
# Tool exists but not allowed for this agent
raise AttributeError(f"Tool '{name}' not allowed for this agent. Allowed tools: {list(self.allowed_tools)}")
else:
# Tool doesn't exist on server
raise AttributeError(f"Tool '{name}' not available on server. Available tools: {self.client.list_tools()}")
# ================ CLIENT MANAGEMENT ================
def is_connected(self) -> bool:
"""Check if client is connected to MCP server"""
return self.client.is_connected()
def get_available_tools(self) -> Dict[str, MCPTool]:
"""Get filtered list of available tools for this agent"""
all_tools = self.client.get_available_tools()
return {name: tool for name, tool in all_tools.items() if name in self.allowed_tools}
def list_tools(self) -> List[str]:
"""Get list of allowed tool names for this agent"""
return list(self.allowed_tools)
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""
Get tool schemas for tools allowed for this agent.
This is the proper MCP way - schemas come from server, not direct imports.
"""
schemas = []
available_tools = self.get_available_tools()
for tool_name, tool_info in available_tools.items():
schema = {
"type": "function",
"function": {
"name": tool_name,
"description": tool_info.description,
"parameters": tool_info.input_schema
}
}
schemas.append(schema)
return schemas
def refresh_tools(self):
"""Refresh the underlying client's tools"""
self.client.refresh_tools()
# Re-validate allowed tools after refresh
available_tools = set(self.client.list_tools())
invalid_tools = self.allowed_tools - available_tools
if invalid_tools:
logger.warning(f"Some allowed tools no longer available after refresh: {invalid_tools}")
self.allowed_tools = self.allowed_tools & available_tools
def close(self):
"""Close MCP client connection"""
self.client.close()
# ================ AGENT TOOL SETS ================
# Define what tools each agent type should have access to
PLANNER_AGENT_TOOLS = [
"download_files",
"document_qa",
"file_read",
"file_write",
"str_replace_based_edit_tool",
"list_workspace",
"file_find_by_name",
]
INFORMATION_SEEKER_TOOLS = [
"batch_web_search",
"url_crawler",
"document_extract",
"document_qa",
"download_files",
"file_read",
"file_write",
"str_replace_based_edit_tool",
"list_workspace",
"file_find_by_name",
]
WRITER_AGENT_TOOLS = [
"file_read",
"list_workspace",
"file_find_by_name",
"search_result_classifier",
"section_writer",
"concat_section_files",
]
def create_filtered_mcp_tools_adapter(
shared_client: MCPClient,
agent_type: str
) -> FilteredMCPToolsAdapter:
"""
Create a filtered MCP tools adapter for specific agent type
Args:
shared_client: Shared MCPClient instance
agent_type: Type of agent ("planner", "information_seeker", "writer")
Returns:
FilteredMCPToolsAdapter with appropriate tools for agent type
"""
tool_sets = {
"planner": PLANNER_AGENT_TOOLS,
"information_seeker": INFORMATION_SEEKER_TOOLS,
"writer": WRITER_AGENT_TOOLS
}
allowed_tools = tool_sets.get(agent_type, PLANNER_AGENT_TOOLS)
return FilteredMCPToolsAdapter(
shared_client=shared_client,
allowed_tools=allowed_tools
)
def create_agent_mcp_tools(
agent_type: str,
server_url: str = "http://localhost:6274/mcp",
retry_config: Optional[RetryConfig] = None
) -> FilteredMCPToolsAdapter:
"""
Convenience factory to create a filtered MCP tools adapter with retry support.
This is the RECOMMENDED way to create MCP tools for agents.
Args:
agent_type: Type of agent ("planner", "information_seeker", "writer")
server_url: URL of the MCP server (default: http://localhost:6274/mcp)
retry_config: Optional retry configuration for handling rate limits
Returns:
FilteredMCPToolsAdapter with appropriate tools and retry support for the agent type
"""
# Create client with retry support
client = create_mcp_client(server_url=server_url, retry_config=retry_config)
# Create filtered adapter for the agent type
return create_filtered_mcp_tools_adapter(client, agent_type)
def create_mcp_client(
server_url: str = "http://localhost:6274/mcp",
retry_config: Optional[RetryConfig] = None
) -> MCPClient:
"""
Factory function to create a generic MCP Client with optional retry configuration
Args:
server_url: URL of the MCP server (default: http://localhost:6274/mcp)
retry_config: Optional retry configuration for handling rate limits
Returns:
MCPClient instance for direct tool calling with automatic retry on rate limits
"""
return MCPClient(server_url=server_url, retry_config=retry_config)
def create_mcp_tools_adapter(
server_url: str = "http://localhost:6274/mcp",
retry_config: Optional[RetryConfig] = None
) -> MCPToolsAdapter:
"""
Factory function to create an MCP Tools Adapter for backward compatibility with retry support.
Args:
server_url: URL of the MCP server (default: http://localhost:6274/mcp)
retry_config: Optional retry configuration for handling rate limits
Returns:
MCPToolsAdapter instance that behaves like MCPTools but uses MCP client with automatic retries
"""
return MCPToolsAdapter(server_url=server_url, retry_config=retry_config)
# Export for compatibility
__all__ = [
'MCPClientResult',
'MCPClient',
'MCPTool',
'RetryConfig',
'MCPToolsAdapter',
'FilteredMCPToolsAdapter',
'create_mcp_client',
'create_mcp_tools_adapter',
'create_filtered_mcp_tools_adapter',
'create_agent_mcp_tools', # RECOMMENDED for agents
'PLANNER_AGENT_TOOLS',
'INFORMATION_SEEKER_TOOLS',
'WRITER_AGENT_TOOLS'
]