#!/usr/bin/env python3 """ MCP Server for MCP Documentation Hosted on Hugging Face Spaces with HTTP transport """ import json import asyncio import logging import uuid from typing import Any, Dict, List, Optional from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel import uvicorn # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI(title="MCP Documentation Server", version="1.0.0") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global data storage chunks_data = None docs_data = None # Session management for MCP sessions = {} def load_data(): """Load the documentation chunks and metadata""" global chunks_data, docs_data try: # Load chunks data with open('mcp_docs/index/chunks_md.json', 'r', encoding='utf-8') as f: chunks_data = json.load(f) # Load docs data with open('mcp_docs/index/docs_md.json', 'r', encoding='utf-8') as f: docs_data = json.load(f) logger.info(f"Loaded {len(chunks_data)} chunks and {len(docs_data)} documents") except Exception as e: logger.error(f"Error loading data: {e}") raise # Pydantic models class SearchRequest(BaseModel): query: str limit: int = 5 class SearchResponse(BaseModel): results: List[Dict[str, Any]] total: int class ToolCallRequest(BaseModel): name: str arguments: Dict[str, Any] class ToolCallResponse(BaseModel): content: List[Dict[str, str]] @app.on_event("startup") async def startup_event(): """Load data on startup""" load_data() @app.get("/") async def root(): """Health check endpoint""" return { "message": "MCP Documentation Server", "status": "running", "chunks_loaded": len(chunks_data) if chunks_data else 0, "docs_loaded": len(docs_data) if docs_data else 0, "mcp_server": True } @app.get("/mcp/info") async def mcp_info(): """MCP server information""" return { "name": "mcp-docs-server", "version": "1.0.0", "capabilities": { "tools": True, "resources": True } } @app.get("/mcp/tools") async def list_tools(): """List available MCP tools""" return [ { "name": "search_docs", "description": "Search through MCP documentation chunks", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "limit": { "type": "integer", "description": "Maximum number of results", "default": 5 } }, "required": ["query"] } }, { "name": "get_chunk", "description": "Get a specific documentation chunk by ID", "inputSchema": { "type": "object", "properties": { "chunk_id": { "type": "string", "description": "Chunk ID to retrieve" } }, "required": ["chunk_id"] } }, { "name": "list_docs", "description": "List all available documents", "inputSchema": { "type": "object", "properties": {} } } ] @app.post("/mcp/tools/call") async def call_tool(request: ToolCallRequest): """Call an MCP tool""" if not chunks_data: raise HTTPException(status_code=500, detail="Data not loaded") name = request.name arguments = request.arguments if name == "search_docs": query = arguments.get("query", "").lower() limit = arguments.get("limit", 5) results = [] for chunk in chunks_data: text = chunk.get('text', '').lower() title = chunk.get('title', '').lower() # Simple scoring score = 0 if query in text: score += text.count(query) * 2 if query in title: score += title.count(query) * 5 if score > 0: results.append({ "chunk_id": chunk.get('chunk_id'), "title": chunk.get('title'), "text": chunk.get('text'), "url": chunk.get('url'), "filename": chunk.get('filename'), "score": score }) # Sort by score and limit results results = sorted(results, key=lambda x: x['score'], reverse=True)[:limit] if results: response = f"Found {len(results)} results for '{arguments.get('query', '')}':\n\n" for i, result in enumerate(results, 1): response += f"{i}. **{result['title']}** (Score: {result['score']})\n" response += f" {result['text'][:200]}...\n" response += f" Source: {result['filename']}\n\n" else: response = f"No results found for '{arguments.get('query', '')}'" return ToolCallResponse(content=[{"type": "text", "text": response}]) elif name == "get_chunk": chunk_id = arguments.get("chunk_id", "") for chunk in chunks_data: if chunk.get('chunk_id') == chunk_id: response = f"**{chunk.get('title', 'Untitled')}**\n\n" response += f"{chunk.get('text', '')}\n\n" response += f"Source: {chunk.get('filename', 'Unknown')}\n" response += f"URL: {chunk.get('url', 'N/A')}" return ToolCallResponse(content=[{"type": "text", "text": response}]) return ToolCallResponse(content=[{"type": "text", "text": f"Chunk {chunk_id} not found"}]) elif name == "list_docs": if not docs_data: return ToolCallResponse(content=[{"type": "text", "text": "No documents available"}]) response = "Available documents:\n\n" for doc in docs_data: response += f"- **{doc.get('title', 'Untitled')}**\n" response += f" ID: {doc.get('id', 'Unknown')}\n" response += f" URL: {doc.get('url', 'N/A')}\n\n" return ToolCallResponse(content=[{"type": "text", "text": response}]) else: return ToolCallResponse(content=[{"type": "text", "text": f"Unknown tool: {name}"}]) @app.get("/mcp/resources") async def list_resources(): """List available MCP resources""" if not docs_data: return [] resources = [] for doc in docs_data: resources.append({ "uri": f"mcp://docs/{doc.get('id', 'unknown')}", "name": doc.get('title', 'Untitled'), "description": doc.get('content', '')[:200] + "..." if len(doc.get('content', '')) > 200 else doc.get('content', ''), "mimeType": "text/plain" }) return resources @app.get("/mcp/resources/{resource_id}") async def read_resource(resource_id: str): """Read a specific MCP resource""" if not chunks_data: return "Data not loaded" # Find chunks for this document doc_chunks = [chunk for chunk in chunks_data if chunk.get('doc_id') == resource_id] if doc_chunks: # Combine all chunks for the document content = "\n\n".join([chunk.get('text', '') for chunk in doc_chunks]) return content else: return f"Document {resource_id} not found" # Legacy REST API endpoints for backward compatibility @app.post("/search", response_model=SearchResponse) async def search_docs(request: SearchRequest): """Search through documentation chunks using text matching""" if not chunks_data: raise HTTPException(status_code=500, detail="Data not loaded") try: query_lower = request.query.lower() results = [] for chunk in chunks_data: text = chunk.get('text', '').lower() title = chunk.get('title', '').lower() # Simple scoring based on query matches score = 0 if query_lower in text: score += text.count(query_lower) * 2 # Text matches worth more if query_lower in title: score += title.count(query_lower) * 5 # Title matches worth most if score > 0: results.append({ "chunk_id": chunk.get('chunk_id'), "title": chunk.get('title'), "text": chunk.get('text'), "url": chunk.get('url'), "filename": chunk.get('filename'), "chunk_index": chunk.get('chunk_index'), "total_chunks": chunk.get('total_chunks'), "score": score }) # Sort by relevance score results = sorted(results, key=lambda x: x['score'], reverse=True) return SearchResponse( results=results[:request.limit], total=len(results) ) except Exception as e: raise HTTPException(status_code=500, detail=f"Search error: {str(e)}") @app.get("/chunks/{chunk_id}") async def get_chunk(chunk_id: str): """Get a specific chunk by ID""" if not chunks_data: raise HTTPException(status_code=500, detail="Data not loaded") for chunk in chunks_data: if chunk.get('chunk_id') == chunk_id: return chunk raise HTTPException(status_code=404, detail="Chunk not found") @app.get("/docs") async def list_docs(): """List all available documents""" if not docs_data: raise HTTPException(status_code=500, detail="Data not loaded") return {"documents": docs_data} def is_initialize_request(body: dict) -> bool: """Check if request is an MCP initialize request""" return (body.get("jsonrpc") == "2.0" and body.get("method") == "initialize" and "id" in body) def create_mcp_server(): """Create MCP server instance""" return { "name": "mcp-docs-server", "version": "1.0.0", "capabilities": { "tools": {"listChanged": True}, "resources": {} } } @app.post("/mcp") async def mcp_post_handler(request: Request): """Handle MCP requests with proper session management""" try: body = await request.json() session_id = request.headers.get("mcp-session-id") # Handle initialization request if is_initialize_request(body): # Create new session new_session_id = str(uuid.uuid4()) sessions[new_session_id] = { "server": create_mcp_server(), "initialized": True } # Return initialization response return { "jsonrpc": "2.0", "id": body.get("id"), "result": { "protocolVersion": "2025-06-18", "capabilities": { "tools": {"listChanged": True}, "resources": {} }, "serverInfo": { "name": "mcp-docs-server", "version": "1.0.0" } } } # Handle other requests - allow without session for MCP compatibility if session_id and session_id not in sessions: return { "jsonrpc": "2.0", "error": { "code": -32000, "message": "Bad Request: Invalid session ID provided" }, "id": body.get("id") } session = sessions.get(session_id) if session_id else None method = body.get("method") params = body.get("params", {}) request_id = body.get("id") # Handle tools/list if method == "tools/list": return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": [ { "name": "search_docs", "title": "Search Documentation", "description": "Search through MCP documentation chunks", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query for MCP documentation" }, "limit": { "type": "integer", "description": "Maximum number of results", "default": 5 } }, "required": ["query"] } }, { "name": "get_chunk", "title": "Get Documentation Chunk", "description": "Get a specific documentation chunk by ID", "inputSchema": { "type": "object", "properties": { "chunk_id": { "type": "string", "description": "Chunk ID to retrieve" } }, "required": ["chunk_id"] } }, { "name": "list_docs", "title": "List Documents", "description": "List all available documents", "inputSchema": { "type": "object", "properties": {} } } ] } } # Handle tools/call elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "search_docs": query = arguments.get("query", "").lower() limit = arguments.get("limit", 5) results = [] for chunk in chunks_data or []: text = chunk.get('text', '').lower() title = chunk.get('title', '').lower() score = 0 if query in text: score += text.count(query) * 2 if query in title: score += title.count(query) * 5 if score > 0: results.append({ "chunk_id": chunk.get('chunk_id'), "title": chunk.get('title'), "text": chunk.get('text'), "url": chunk.get('url'), "filename": chunk.get('filename'), "score": score }) results = sorted(results, key=lambda x: x['score'], reverse=True)[:limit] if results: response_text = f"Found {len(results)} results for '{query}':\n\n" for i, result in enumerate(results, 1): response_text += f"{i}. **{result['title']}** (Score: {result['score']})\n" response_text += f" {result['text'][:200]}...\n" response_text += f" Source: {result['filename']}\n\n" else: response_text = f"No results found for '{query}'" return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": response_text } ] } } elif tool_name == "get_chunk": chunk_id = arguments.get("chunk_id", "") for chunk in chunks_data or []: if chunk.get('chunk_id') == chunk_id: response_text = f"**{chunk.get('title', 'Untitled')}**\n\n" response_text += f"{chunk.get('text', '')}\n\n" response_text += f"Source: {chunk.get('filename', 'Unknown')}\n" response_text += f"URL: {chunk.get('url', 'N/A')}" return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": response_text } ] } } return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": f"Chunk {chunk_id} not found" } ] } } elif tool_name == "list_docs": if not docs_data: return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": "No documents available" } ] } } response_text = "Available documents:\n\n" for doc in docs_data: response_text += f"- **{doc.get('title', 'Untitled')}**\n" response_text += f" ID: {doc.get('id', 'Unknown')}\n" response_text += f" URL: {doc.get('url', 'N/A')}\n\n" return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": response_text } ] } } else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Unknown tool: {tool_name}" } } else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Unknown method: {method}" } } except Exception as e: return { "jsonrpc": "2.0", "id": body.get("id") if 'body' in locals() else None, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)