Spaces:
Sleeping
A newer version of the Gradio SDK is available:
6.9.0
Implementation Guide - MCP Research Server
This document provides technical details on how the MCP Research Server is implemented.
Table of Contents
- Architecture Overview
- Protocol Implementation
- Tool System
- Integration Points
- Code Walkthrough
- Extension Guide
Architecture Overview
High-Level Design
External MCP Clients
(Claude Desktop, Cursor, Custom Clients)
β
(JSON-RPC 2.0)
β
MCPServerProtocol Class
Initialize Handler
Tool List Handler
Tool Call Router
β
β β β β
Web Document Synthesis Report
Search Analyzer Tool Generator
Key Components
- MCPServerProtocol - Main protocol handler
- Tool Implementations - Individual tool classes
- Async Processing - Non-blocking execution
- Error Handling - Graceful failure handling
Protocol Implementation
JSON-RPC 2.0 Compliance
The MCP Research Server implements JSON-RPC 2.0 specification:
Request Format:
{
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 1
}
Success Response:
{
"jsonrpc": "2.0",
"id": 1,
"result": { ... }
}
Error Response:
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32603,
"message": "Internal error"
}
}
Implemented Methods
initialize
async def handle_initialize(self, params: Dict) -> Dict:- Returns server info and capabilities
- One-time initialization call
tools/list
async def handle_list_tools(self) -> Dict:- Lists all available tools with schemas
- Called by clients to discover capabilities
tools/call
async def call_tool(self, name: str, arguments: Dict) -> Any:- Executes a specific tool
- Routes to appropriate tool handler
Error Codes
-32700: Parse error-32600: Invalid Request-32601: Method not found-32602: Invalid params-32603: Internal error
Tool System
Tool Definition Schema
Each tool is defined with:
{
"name": "tool_name",
"description": "Human-readable description",
"inputSchema": {
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "Parameter description"
}
},
"required": ["param1"]
}
}
Tool Implementation Pattern
class MyMCPTool:
"""My custom MCP tool"""
async def execute(self, param1: str, param2: int = 10) -> Dict:
"""Execute the tool with parameters"""
try:
# Process input
result = await self._do_work(param1, param2)
# Return structured result
return {
"status": "success",
"result": result
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
Tool Registration
Tools are registered in _initialize_tools():
def _initialize_tools(self) -> Dict[str, Dict]:
return {
"web_search": {
"name": "web_search",
# ... schema ...
},
"analyze_content": {
# ... more tools ...
}
}
And routed in call_tool():
async def call_tool(self, name: str, arguments: Dict) -> Any:
if name == "web_search":
return await self._web_search(**arguments)
elif name == "analyze_content":
return await self._analyze_content(**arguments)
# ... more tools ...
Integration Points
1. Gradio Interface Integration
File: app_enhanced.py
The Gradio app communicates with the MCP server:
class MCPClientInterface:
async def send_request(self, method: str, params: dict) -> dict:
# Sends JSON-RPC request to server
message = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": request_id
}
# Send to server subprocess
# Read response
2. Claude Desktop Integration
File: CLAUDE_DESKTOP_SETUP.md
Configuration for Claude Desktop:
{
"mcpServers": {
"research": {
"command": "python",
"args": ["/path/to/mcp_server.py"]
}
}
}
Claude Desktop:
- Reads config
- Launches
mcp_server.pyas subprocess - Communicates via stdio (JSON-RPC 2.0)
- Automatically discovers tools
- Uses tools in conversations
3. Standard MCP Clients
Any client implementing the MCP spec can:
- Launch the server as subprocess
- Send JSON-RPC messages via stdin
- Receive responses via stdout
- Use discovered tools
Code Walkthrough
Initialization Flow
# 1. Server starts
async def main():
server = MCPServerProtocol()
# 2. Client sends initialize
message = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {},
"id": 1
}
# 3. Server processes
response = await server.process_message(message)
# 4. Response sent back
# {
# "jsonrpc": "2.0",
# "id": 1,
# "result": {
# "protocolVersion": "2024-11-05",
# "serverInfo": {...}
# }
# }
Tool Discovery Flow
# 1. Client requests tool list
message = {
"jsonrpc": "2.0",
"method": "tools/list",
"id": 2
}
# 2. Server lists tools
response = await server.process_message(message)
# 3. Response includes all tool schemas
# {
# "jsonrpc": "2.0",
# "id": 2,
# "result": {
# "tools": [
# {"name": "web_search", "description": "...", "inputSchema": {...}},
# {"name": "analyze_content", ...},
# ...
# ]
# }
# }
Tool Execution Flow
# 1. Client calls tool
message = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "web_search",
"arguments": {"query": "Python programming"}
},
"id": 3
}
# 2. Server routes to tool
async def call_tool(self, name: str, arguments: Dict) -> Any:
if name == "web_search":
return await self._web_search(arguments["query"])
# 3. Tool executes
async def _web_search(self, query: str, max_results: int = 5) -> Dict:
# Perform search
results = [...]
return {
"status": "success",
"query": query,
"results": results,
...
}
# 4. Response sent back
# {
# "jsonrpc": "2.0",
# "id": 3,
# "result": {
# "status": "success",
# "results": [...]
# }
# }
Extension Guide
Adding a New Tool
Step 1: Define Tool Schema
def _initialize_tools(self) -> Dict[str, Dict]:
return {
# ... existing tools ...
"my_new_tool": {
"name": "my_new_tool",
"description": "Does something useful",
"inputSchema": {
"type": "object",
"properties": {
"input_text": {
"type": "string",
"description": "The input to process"
},
"option": {
"type": "string",
"enum": ["option1", "option2"],
"description": "An option"
}
},
"required": ["input_text"]
}
}
}
Step 2: Implement Tool Method
async def _my_new_tool(self, input_text: str, option: str = "option1") -> Dict:
"""Implement the new tool"""
try:
# Do something
result = self._process_text(input_text, option)
return {
"status": "success",
"result": result
}
except Exception as e:
logger.error(f"Tool error: {e}")
return {
"status": "error",
"error": str(e)
}
def _process_text(self, text: str, option: str) -> Any:
"""Helper method"""
# Implementation
return processed_result
Step 3: Add to Router
async def call_tool(self, name: str, arguments: Dict) -> Any:
"""Execute a tool by name"""
# ... existing tools ...
elif name == "my_new_tool":
return await self._my_new_tool(
arguments.get("input_text"),
arguments.get("option", "option1")
)
else:
raise ValueError(f"Unknown tool: {name}")
Step 4: Test the Tool
@pytest.mark.asyncio
async def test_my_new_tool(mcp_server):
"""Test the new tool"""
message = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "my_new_tool",
"arguments": {
"input_text": "test input"
}
},
"id": 100
}
response = await mcp_server.process_message(message)
assert "result" in response
assert response["result"]["status"] == "success"
Integrating Real APIs
Example: Using OpenAI API
import openai
async def _web_search(self, query: str, max_results: int = 5) -> Dict:
"""Use real web search API"""
try:
# Example with a real API
results = await self._call_real_api(query)
return {
"status": "success",
"query": query,
"results": results,
"source": "openai_api"
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
async def _call_real_api(self, query: str) -> List[Dict]:
"""Call actual API"""
api_key = os.getenv("OPENAI_API_KEY")
# Make API call
# Parse and return results
pass
Adding Caching
from functools import lru_cache
import json
class MCPServerProtocol:
def __init__(self):
self.tools = self._initialize_tools()
self.cache = {}
async def call_tool(self, name: str, arguments: Dict) -> Any:
"""Execute tool with caching"""
# Create cache key
cache_key = f"{name}:{json.dumps(arguments, sort_keys=True)}"
# Check cache
if cache_key in self.cache:
return self.cache[cache_key]
# Execute tool
result = await self._execute_tool(name, arguments)
# Cache result
self.cache[cache_key] = result
return result
Adding Rate Limiting
import time
class MCPServerProtocol:
def __init__(self):
self.tools = self._initialize_tools()
self.last_call_time = {}
async def call_tool(self, name: str, arguments: Dict) -> Any:
"""Execute tool with rate limiting"""
# Check rate limit
now = time.time()
if name in self.last_call_time:
elapsed = now - self.last_call_time[name]
if elapsed < 1.0: # 1 second minimum between calls
await asyncio.sleep(1.0 - elapsed)
# Update timestamp
self.last_call_time[name] = time.time()
# Execute tool
return await self._execute_tool(name, arguments)
Performance Optimization
Async Best Practices
Use async/await consistently
async def process_request(self): result = await self.execute_tool() # Good # Not: result = self.execute_tool() # BadAvoid blocking operations
# Good: Use async libraries async with httpx.AsyncClient() as client: response = await client.get(url) # Bad: Blocking call response = requests.get(url)Use gather for parallel execution
# Execute multiple tools in parallel results = await asyncio.gather( self._tool1(), self._tool2(), self._tool3() )
Memory Management
Limit cache size
from collections import OrderedDict class LRUCache: def __init__(self, maxsize=100): self.cache = OrderedDict() self.maxsize = maxsizeStream large responses
async def process_large_file(self, file_path): with open(file_path, 'r') as f: for chunk in iter(lambda: f.read(8192), ''): yield chunk
Debugging
Enable Debug Logging
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
Test Locally
# Run server directly
python mcp_server.py
# In another terminal, send test messages
echo '{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}' | nc localhost 9000
Monitor Performance
import time
async def call_tool(self, name: str, arguments: Dict) -> Any:
start = time.time()
result = await self._execute_tool(name, arguments)
elapsed = time.time() - start
logger.info(f"Tool {name} took {elapsed:.2f}s")
return result
Summary
The MCP Research Server provides:
- Full JSON-RPC 2.0 protocol implementation
- Modular, extensible tool system
- Async/await for performance
- Proper error handling
- Multiple integration points
- Easy extension with new tools
- Comprehensive testing
For more details, refer to the inline code documentation and the MCP Specification.