ecomcp / docs /IMPLEMENTATION_GUIDE.md
vinhnx90's picture
Project liftoff
0f6d44d

A newer version of the Gradio SDK is available: 6.9.0

Upgrade

Implementation Guide - MCP Research Server

This document provides technical details on how the MCP Research Server is implemented.

Table of Contents

  1. Architecture Overview
  2. Protocol Implementation
  3. Tool System
  4. Integration Points
  5. Code Walkthrough
  6. Extension Guide

Architecture Overview

High-Level Design


        External MCP Clients                      
  (Claude Desktop, Cursor, Custom Clients)       

                    ↓
            (JSON-RPC 2.0)
                    ↓

      MCPServerProtocol Class                     
   Initialize Handler                          
   Tool List Handler                           
   Tool Call Router                            

                    ↓
        
        ↓           ↓           ↓             ↓
         
    Web       Document    Synthesis   Report   
    Search    Analyzer    Tool       Generator 
         

Key Components

  1. MCPServerProtocol - Main protocol handler
  2. Tool Implementations - Individual tool classes
  3. Async Processing - Non-blocking execution
  4. Error Handling - Graceful failure handling

Protocol Implementation

JSON-RPC 2.0 Compliance

The MCP Research Server implements JSON-RPC 2.0 specification:

Request Format:
{
  "jsonrpc": "2.0",
  "method": "tools/list",
  "params": {},
  "id": 1
}

Success Response:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": { ... }
}

Error Response:
{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32603,
    "message": "Internal error"
  }
}

Implemented Methods

  1. initialize

    async def handle_initialize(self, params: Dict) -> Dict:
    
    • Returns server info and capabilities
    • One-time initialization call
  2. tools/list

    async def handle_list_tools(self) -> Dict:
    
    • Lists all available tools with schemas
    • Called by clients to discover capabilities
  3. tools/call

    async def call_tool(self, name: str, arguments: Dict) -> Any:
    
    • Executes a specific tool
    • Routes to appropriate tool handler

Error Codes

  • -32700: Parse error
  • -32600: Invalid Request
  • -32601: Method not found
  • -32602: Invalid params
  • -32603: Internal error

Tool System

Tool Definition Schema

Each tool is defined with:

{
    "name": "tool_name",
    "description": "Human-readable description",
    "inputSchema": {
        "type": "object",
        "properties": {
            "param1": {
                "type": "string",
                "description": "Parameter description"
            }
        },
        "required": ["param1"]
    }
}

Tool Implementation Pattern

class MyMCPTool:
    """My custom MCP tool"""
    
    async def execute(self, param1: str, param2: int = 10) -> Dict:
        """Execute the tool with parameters"""
        try:
            # Process input
            result = await self._do_work(param1, param2)
            
            # Return structured result
            return {
                "status": "success",
                "result": result
            }
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }

Tool Registration

Tools are registered in _initialize_tools():

def _initialize_tools(self) -> Dict[str, Dict]:
    return {
        "web_search": {
            "name": "web_search",
            # ... schema ...
        },
        "analyze_content": {
            # ... more tools ...
        }
    }

And routed in call_tool():

async def call_tool(self, name: str, arguments: Dict) -> Any:
    if name == "web_search":
        return await self._web_search(**arguments)
    elif name == "analyze_content":
        return await self._analyze_content(**arguments)
    # ... more tools ...

Integration Points

1. Gradio Interface Integration

File: app_enhanced.py

The Gradio app communicates with the MCP server:

class MCPClientInterface:
    async def send_request(self, method: str, params: dict) -> dict:
        # Sends JSON-RPC request to server
        message = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params,
            "id": request_id
        }
        # Send to server subprocess
        # Read response

2. Claude Desktop Integration

File: CLAUDE_DESKTOP_SETUP.md

Configuration for Claude Desktop:

{
  "mcpServers": {
    "research": {
      "command": "python",
      "args": ["/path/to/mcp_server.py"]
    }
  }
}

Claude Desktop:

  1. Reads config
  2. Launches mcp_server.py as subprocess
  3. Communicates via stdio (JSON-RPC 2.0)
  4. Automatically discovers tools
  5. Uses tools in conversations

3. Standard MCP Clients

Any client implementing the MCP spec can:

  1. Launch the server as subprocess
  2. Send JSON-RPC messages via stdin
  3. Receive responses via stdout
  4. Use discovered tools

Code Walkthrough

Initialization Flow

# 1. Server starts
async def main():
    server = MCPServerProtocol()
    
    # 2. Client sends initialize
    message = {
        "jsonrpc": "2.0",
        "method": "initialize",
        "params": {},
        "id": 1
    }
    
    # 3. Server processes
    response = await server.process_message(message)
    
    # 4. Response sent back
    # {
    #   "jsonrpc": "2.0",
    #   "id": 1,
    #   "result": {
    #     "protocolVersion": "2024-11-05",
    #     "serverInfo": {...}
    #   }
    # }

Tool Discovery Flow

# 1. Client requests tool list
message = {
    "jsonrpc": "2.0",
    "method": "tools/list",
    "id": 2
}

# 2. Server lists tools
response = await server.process_message(message)

# 3. Response includes all tool schemas
# {
#   "jsonrpc": "2.0",
#   "id": 2,
#   "result": {
#     "tools": [
#       {"name": "web_search", "description": "...", "inputSchema": {...}},
#       {"name": "analyze_content", ...},
#       ...
#     ]
#   }
# }

Tool Execution Flow

# 1. Client calls tool
message = {
    "jsonrpc": "2.0",
    "method": "tools/call",
    "params": {
        "name": "web_search",
        "arguments": {"query": "Python programming"}
    },
    "id": 3
}

# 2. Server routes to tool
async def call_tool(self, name: str, arguments: Dict) -> Any:
    if name == "web_search":
        return await self._web_search(arguments["query"])

# 3. Tool executes
async def _web_search(self, query: str, max_results: int = 5) -> Dict:
    # Perform search
    results = [...]
    return {
        "status": "success",
        "query": query,
        "results": results,
        ...
    }

# 4. Response sent back
# {
#   "jsonrpc": "2.0",
#   "id": 3,
#   "result": {
#     "status": "success",
#     "results": [...]
#   }
# }

Extension Guide

Adding a New Tool

Step 1: Define Tool Schema

def _initialize_tools(self) -> Dict[str, Dict]:
    return {
        # ... existing tools ...
        "my_new_tool": {
            "name": "my_new_tool",
            "description": "Does something useful",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "input_text": {
                        "type": "string",
                        "description": "The input to process"
                    },
                    "option": {
                        "type": "string",
                        "enum": ["option1", "option2"],
                        "description": "An option"
                    }
                },
                "required": ["input_text"]
            }
        }
    }

Step 2: Implement Tool Method

async def _my_new_tool(self, input_text: str, option: str = "option1") -> Dict:
    """Implement the new tool"""
    try:
        # Do something
        result = self._process_text(input_text, option)
        
        return {
            "status": "success",
            "result": result
        }
    except Exception as e:
        logger.error(f"Tool error: {e}")
        return {
            "status": "error",
            "error": str(e)
        }

def _process_text(self, text: str, option: str) -> Any:
    """Helper method"""
    # Implementation
    return processed_result

Step 3: Add to Router

async def call_tool(self, name: str, arguments: Dict) -> Any:
    """Execute a tool by name"""
    # ... existing tools ...
    elif name == "my_new_tool":
        return await self._my_new_tool(
            arguments.get("input_text"),
            arguments.get("option", "option1")
        )
    else:
        raise ValueError(f"Unknown tool: {name}")

Step 4: Test the Tool

@pytest.mark.asyncio
async def test_my_new_tool(mcp_server):
    """Test the new tool"""
    message = {
        "jsonrpc": "2.0",
        "method": "tools/call",
        "params": {
            "name": "my_new_tool",
            "arguments": {
                "input_text": "test input"
            }
        },
        "id": 100
    }
    
    response = await mcp_server.process_message(message)
    
    assert "result" in response
    assert response["result"]["status"] == "success"

Integrating Real APIs

Example: Using OpenAI API

import openai

async def _web_search(self, query: str, max_results: int = 5) -> Dict:
    """Use real web search API"""
    try:
        # Example with a real API
        results = await self._call_real_api(query)
        
        return {
            "status": "success",
            "query": query,
            "results": results,
            "source": "openai_api"
        }
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

async def _call_real_api(self, query: str) -> List[Dict]:
    """Call actual API"""
    api_key = os.getenv("OPENAI_API_KEY")
    # Make API call
    # Parse and return results
    pass

Adding Caching

from functools import lru_cache
import json

class MCPServerProtocol:
    def __init__(self):
        self.tools = self._initialize_tools()
        self.cache = {}
    
    async def call_tool(self, name: str, arguments: Dict) -> Any:
        """Execute tool with caching"""
        # Create cache key
        cache_key = f"{name}:{json.dumps(arguments, sort_keys=True)}"
        
        # Check cache
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # Execute tool
        result = await self._execute_tool(name, arguments)
        
        # Cache result
        self.cache[cache_key] = result
        
        return result

Adding Rate Limiting

import time

class MCPServerProtocol:
    def __init__(self):
        self.tools = self._initialize_tools()
        self.last_call_time = {}
    
    async def call_tool(self, name: str, arguments: Dict) -> Any:
        """Execute tool with rate limiting"""
        # Check rate limit
        now = time.time()
        if name in self.last_call_time:
            elapsed = now - self.last_call_time[name]
            if elapsed < 1.0:  # 1 second minimum between calls
                await asyncio.sleep(1.0 - elapsed)
        
        # Update timestamp
        self.last_call_time[name] = time.time()
        
        # Execute tool
        return await self._execute_tool(name, arguments)

Performance Optimization

Async Best Practices

  1. Use async/await consistently

    async def process_request(self):
        result = await self.execute_tool()  # Good
        # Not: result = self.execute_tool()  # Bad
    
  2. Avoid blocking operations

    # Good: Use async libraries
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
    
    # Bad: Blocking call
    response = requests.get(url)
    
  3. Use gather for parallel execution

    # Execute multiple tools in parallel
    results = await asyncio.gather(
        self._tool1(),
        self._tool2(),
        self._tool3()
    )
    

Memory Management

  1. Limit cache size

    from collections import OrderedDict
    
    class LRUCache:
        def __init__(self, maxsize=100):
            self.cache = OrderedDict()
            self.maxsize = maxsize
    
  2. Stream large responses

    async def process_large_file(self, file_path):
        with open(file_path, 'r') as f:
            for chunk in iter(lambda: f.read(8192), ''):
                yield chunk
    

Debugging

Enable Debug Logging

import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

Test Locally

# Run server directly
python mcp_server.py

# In another terminal, send test messages
echo '{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}' | nc localhost 9000

Monitor Performance

import time

async def call_tool(self, name: str, arguments: Dict) -> Any:
    start = time.time()
    result = await self._execute_tool(name, arguments)
    elapsed = time.time() - start
    
    logger.info(f"Tool {name} took {elapsed:.2f}s")
    
    return result

Summary

The MCP Research Server provides:

  1. Full JSON-RPC 2.0 protocol implementation
  2. Modular, extensible tool system
  3. Async/await for performance
  4. Proper error handling
  5. Multiple integration points
  6. Easy extension with new tools
  7. Comprehensive testing

For more details, refer to the inline code documentation and the MCP Specification.