# Implementation Guide - MCP Research Server This document provides technical details on how the MCP Research Server is implemented. ## Table of Contents 1. [Architecture Overview](#architecture-overview) 2. [Protocol Implementation](#protocol-implementation) 3. [Tool System](#tool-system) 4. [Integration Points](#integration-points) 5. [Code Walkthrough](#code-walkthrough) 6. [Extension Guide](#extension-guide) --- ## Architecture Overview ### High-Level Design ``` External MCP Clients (Claude Desktop, Cursor, Custom Clients) ↓ (JSON-RPC 2.0) ↓ MCPServerProtocol Class Initialize Handler Tool List Handler Tool Call Router ↓ ↓ ↓ ↓ ↓ Web Document Synthesis Report Search Analyzer Tool Generator ``` ### Key Components 1. **MCPServerProtocol** - Main protocol handler 2. **Tool Implementations** - Individual tool classes 3. **Async Processing** - Non-blocking execution 4. **Error Handling** - Graceful failure handling --- ## Protocol Implementation ### JSON-RPC 2.0 Compliance The MCP Research Server implements JSON-RPC 2.0 specification: ``` Request Format: { "jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": 1 } Success Response: { "jsonrpc": "2.0", "id": 1, "result": { ... } } Error Response: { "jsonrpc": "2.0", "id": 1, "error": { "code": -32603, "message": "Internal error" } } ``` ### Implemented Methods 1. **initialize** ```python async def handle_initialize(self, params: Dict) -> Dict: ``` - Returns server info and capabilities - One-time initialization call 2. **tools/list** ```python async def handle_list_tools(self) -> Dict: ``` - Lists all available tools with schemas - Called by clients to discover capabilities 3. **tools/call** ```python async def call_tool(self, name: str, arguments: Dict) -> Any: ``` - Executes a specific tool - Routes to appropriate tool handler ### Error Codes - `-32700`: Parse error - `-32600`: Invalid Request - `-32601`: Method not found - `-32602`: Invalid params - `-32603`: Internal error --- ## Tool System ### Tool Definition Schema Each tool is defined with: ```python { "name": "tool_name", "description": "Human-readable description", "inputSchema": { "type": "object", "properties": { "param1": { "type": "string", "description": "Parameter description" } }, "required": ["param1"] } } ``` ### Tool Implementation Pattern ```python class MyMCPTool: """My custom MCP tool""" async def execute(self, param1: str, param2: int = 10) -> Dict: """Execute the tool with parameters""" try: # Process input result = await self._do_work(param1, param2) # Return structured result return { "status": "success", "result": result } except Exception as e: return { "status": "error", "error": str(e) } ``` ### Tool Registration Tools are registered in `_initialize_tools()`: ```python def _initialize_tools(self) -> Dict[str, Dict]: return { "web_search": { "name": "web_search", # ... schema ... }, "analyze_content": { # ... more tools ... } } ``` And routed in `call_tool()`: ```python async def call_tool(self, name: str, arguments: Dict) -> Any: if name == "web_search": return await self._web_search(**arguments) elif name == "analyze_content": return await self._analyze_content(**arguments) # ... more tools ... ``` --- ## Integration Points ### 1. Gradio Interface Integration **File**: `app_enhanced.py` The Gradio app communicates with the MCP server: ```python class MCPClientInterface: async def send_request(self, method: str, params: dict) -> dict: # Sends JSON-RPC request to server message = { "jsonrpc": "2.0", "method": method, "params": params, "id": request_id } # Send to server subprocess # Read response ``` ### 2. Claude Desktop Integration **File**: `CLAUDE_DESKTOP_SETUP.md` Configuration for Claude Desktop: ```json { "mcpServers": { "research": { "command": "python", "args": ["/path/to/mcp_server.py"] } } } ``` Claude Desktop: 1. Reads config 2. Launches `mcp_server.py` as subprocess 3. Communicates via stdio (JSON-RPC 2.0) 4. Automatically discovers tools 5. Uses tools in conversations ### 3. Standard MCP Clients Any client implementing the MCP spec can: 1. Launch the server as subprocess 2. Send JSON-RPC messages via stdin 3. Receive responses via stdout 4. Use discovered tools --- ## Code Walkthrough ### Initialization Flow ```python # 1. Server starts async def main(): server = MCPServerProtocol() # 2. Client sends initialize message = { "jsonrpc": "2.0", "method": "initialize", "params": {}, "id": 1 } # 3. Server processes response = await server.process_message(message) # 4. Response sent back # { # "jsonrpc": "2.0", # "id": 1, # "result": { # "protocolVersion": "2024-11-05", # "serverInfo": {...} # } # } ``` ### Tool Discovery Flow ```python # 1. Client requests tool list message = { "jsonrpc": "2.0", "method": "tools/list", "id": 2 } # 2. Server lists tools response = await server.process_message(message) # 3. Response includes all tool schemas # { # "jsonrpc": "2.0", # "id": 2, # "result": { # "tools": [ # {"name": "web_search", "description": "...", "inputSchema": {...}}, # {"name": "analyze_content", ...}, # ... # ] # } # } ``` ### Tool Execution Flow ```python # 1. Client calls tool message = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": "web_search", "arguments": {"query": "Python programming"} }, "id": 3 } # 2. Server routes to tool async def call_tool(self, name: str, arguments: Dict) -> Any: if name == "web_search": return await self._web_search(arguments["query"]) # 3. Tool executes async def _web_search(self, query: str, max_results: int = 5) -> Dict: # Perform search results = [...] return { "status": "success", "query": query, "results": results, ... } # 4. Response sent back # { # "jsonrpc": "2.0", # "id": 3, # "result": { # "status": "success", # "results": [...] # } # } ``` --- ## Extension Guide ### Adding a New Tool #### Step 1: Define Tool Schema ```python def _initialize_tools(self) -> Dict[str, Dict]: return { # ... existing tools ... "my_new_tool": { "name": "my_new_tool", "description": "Does something useful", "inputSchema": { "type": "object", "properties": { "input_text": { "type": "string", "description": "The input to process" }, "option": { "type": "string", "enum": ["option1", "option2"], "description": "An option" } }, "required": ["input_text"] } } } ``` #### Step 2: Implement Tool Method ```python async def _my_new_tool(self, input_text: str, option: str = "option1") -> Dict: """Implement the new tool""" try: # Do something result = self._process_text(input_text, option) return { "status": "success", "result": result } except Exception as e: logger.error(f"Tool error: {e}") return { "status": "error", "error": str(e) } def _process_text(self, text: str, option: str) -> Any: """Helper method""" # Implementation return processed_result ``` #### Step 3: Add to Router ```python async def call_tool(self, name: str, arguments: Dict) -> Any: """Execute a tool by name""" # ... existing tools ... elif name == "my_new_tool": return await self._my_new_tool( arguments.get("input_text"), arguments.get("option", "option1") ) else: raise ValueError(f"Unknown tool: {name}") ``` #### Step 4: Test the Tool ```python @pytest.mark.asyncio async def test_my_new_tool(mcp_server): """Test the new tool""" message = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": "my_new_tool", "arguments": { "input_text": "test input" } }, "id": 100 } response = await mcp_server.process_message(message) assert "result" in response assert response["result"]["status"] == "success" ``` ### Integrating Real APIs #### Example: Using OpenAI API ```python import openai async def _web_search(self, query: str, max_results: int = 5) -> Dict: """Use real web search API""" try: # Example with a real API results = await self._call_real_api(query) return { "status": "success", "query": query, "results": results, "source": "openai_api" } except Exception as e: return { "status": "error", "error": str(e) } async def _call_real_api(self, query: str) -> List[Dict]: """Call actual API""" api_key = os.getenv("OPENAI_API_KEY") # Make API call # Parse and return results pass ``` ### Adding Caching ```python from functools import lru_cache import json class MCPServerProtocol: def __init__(self): self.tools = self._initialize_tools() self.cache = {} async def call_tool(self, name: str, arguments: Dict) -> Any: """Execute tool with caching""" # Create cache key cache_key = f"{name}:{json.dumps(arguments, sort_keys=True)}" # Check cache if cache_key in self.cache: return self.cache[cache_key] # Execute tool result = await self._execute_tool(name, arguments) # Cache result self.cache[cache_key] = result return result ``` ### Adding Rate Limiting ```python import time class MCPServerProtocol: def __init__(self): self.tools = self._initialize_tools() self.last_call_time = {} async def call_tool(self, name: str, arguments: Dict) -> Any: """Execute tool with rate limiting""" # Check rate limit now = time.time() if name in self.last_call_time: elapsed = now - self.last_call_time[name] if elapsed < 1.0: # 1 second minimum between calls await asyncio.sleep(1.0 - elapsed) # Update timestamp self.last_call_time[name] = time.time() # Execute tool return await self._execute_tool(name, arguments) ``` --- ## Performance Optimization ### Async Best Practices 1. **Use async/await consistently** ```python async def process_request(self): result = await self.execute_tool() # Good # Not: result = self.execute_tool() # Bad ``` 2. **Avoid blocking operations** ```python # Good: Use async libraries async with httpx.AsyncClient() as client: response = await client.get(url) # Bad: Blocking call response = requests.get(url) ``` 3. **Use gather for parallel execution** ```python # Execute multiple tools in parallel results = await asyncio.gather( self._tool1(), self._tool2(), self._tool3() ) ``` ### Memory Management 1. **Limit cache size** ```python from collections import OrderedDict class LRUCache: def __init__(self, maxsize=100): self.cache = OrderedDict() self.maxsize = maxsize ``` 2. **Stream large responses** ```python async def process_large_file(self, file_path): with open(file_path, 'r') as f: for chunk in iter(lambda: f.read(8192), ''): yield chunk ``` --- ## Debugging ### Enable Debug Logging ```python import logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) ``` ### Test Locally ```bash # Run server directly python mcp_server.py # In another terminal, send test messages echo '{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}' | nc localhost 9000 ``` ### Monitor Performance ```python import time async def call_tool(self, name: str, arguments: Dict) -> Any: start = time.time() result = await self._execute_tool(name, arguments) elapsed = time.time() - start logger.info(f"Tool {name} took {elapsed:.2f}s") return result ``` --- ## Summary The MCP Research Server provides: 1. Full JSON-RPC 2.0 protocol implementation 2. Modular, extensible tool system 3. Async/await for performance 4. Proper error handling 5. Multiple integration points 6. Easy extension with new tools 7. Comprehensive testing For more details, refer to the inline code documentation and the [MCP Specification](https://modelcontextprotocol.io/).