Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| MCP Server for MCP Documentation | |
| Hosted on Hugging Face Spaces with HTTP transport | |
| """ | |
| import json | |
| import asyncio | |
| import logging | |
| import uuid | |
| from typing import Any, Dict, List, Optional | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| import uvicorn | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI(title="MCP Documentation Server", version="1.0.0") | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Global data storage | |
| chunks_data = None | |
| docs_data = None | |
| # Session management for MCP | |
| sessions = {} | |
| def load_data(): | |
| """Load the documentation chunks and metadata""" | |
| global chunks_data, docs_data | |
| try: | |
| # Load chunks data | |
| with open('mcp_docs/index/chunks_md.json', 'r', encoding='utf-8') as f: | |
| chunks_data = json.load(f) | |
| # Load docs data | |
| with open('mcp_docs/index/docs_md.json', 'r', encoding='utf-8') as f: | |
| docs_data = json.load(f) | |
| logger.info(f"Loaded {len(chunks_data)} chunks and {len(docs_data)} documents") | |
| except Exception as e: | |
| logger.error(f"Error loading data: {e}") | |
| raise | |
| # Pydantic models | |
| class SearchRequest(BaseModel): | |
| query: str | |
| limit: int = 5 | |
| class SearchResponse(BaseModel): | |
| results: List[Dict[str, Any]] | |
| total: int | |
| class ToolCallRequest(BaseModel): | |
| name: str | |
| arguments: Dict[str, Any] | |
| class ToolCallResponse(BaseModel): | |
| content: List[Dict[str, str]] | |
| async def startup_event(): | |
| """Load data on startup""" | |
| load_data() | |
| async def root(): | |
| """Health check endpoint""" | |
| return { | |
| "message": "MCP Documentation Server", | |
| "status": "running", | |
| "chunks_loaded": len(chunks_data) if chunks_data else 0, | |
| "docs_loaded": len(docs_data) if docs_data else 0, | |
| "mcp_server": True | |
| } | |
| async def mcp_info(): | |
| """MCP server information""" | |
| return { | |
| "name": "mcp-docs-server", | |
| "version": "1.0.0", | |
| "capabilities": { | |
| "tools": True, | |
| "resources": True | |
| } | |
| } | |
| async def list_tools(): | |
| """List available MCP tools""" | |
| return [ | |
| { | |
| "name": "search_docs", | |
| "description": "Search through MCP documentation chunks", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Search query" | |
| }, | |
| "limit": { | |
| "type": "integer", | |
| "description": "Maximum number of results", | |
| "default": 5 | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| }, | |
| { | |
| "name": "get_chunk", | |
| "description": "Get a specific documentation chunk by ID", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "chunk_id": { | |
| "type": "string", | |
| "description": "Chunk ID to retrieve" | |
| } | |
| }, | |
| "required": ["chunk_id"] | |
| } | |
| }, | |
| { | |
| "name": "list_docs", | |
| "description": "List all available documents", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": {} | |
| } | |
| } | |
| ] | |
| async def call_tool(request: ToolCallRequest): | |
| """Call an MCP tool""" | |
| if not chunks_data: | |
| raise HTTPException(status_code=500, detail="Data not loaded") | |
| name = request.name | |
| arguments = request.arguments | |
| if name == "search_docs": | |
| query = arguments.get("query", "").lower() | |
| limit = arguments.get("limit", 5) | |
| results = [] | |
| for chunk in chunks_data: | |
| text = chunk.get('text', '').lower() | |
| title = chunk.get('title', '').lower() | |
| # Simple scoring | |
| score = 0 | |
| if query in text: | |
| score += text.count(query) * 2 | |
| if query in title: | |
| score += title.count(query) * 5 | |
| if score > 0: | |
| results.append({ | |
| "chunk_id": chunk.get('chunk_id'), | |
| "title": chunk.get('title'), | |
| "text": chunk.get('text'), | |
| "url": chunk.get('url'), | |
| "filename": chunk.get('filename'), | |
| "score": score | |
| }) | |
| # Sort by score and limit results | |
| results = sorted(results, key=lambda x: x['score'], reverse=True)[:limit] | |
| if results: | |
| response = f"Found {len(results)} results for '{arguments.get('query', '')}':\n\n" | |
| for i, result in enumerate(results, 1): | |
| response += f"{i}. **{result['title']}** (Score: {result['score']})\n" | |
| response += f" {result['text'][:200]}...\n" | |
| response += f" Source: {result['filename']}\n\n" | |
| else: | |
| response = f"No results found for '{arguments.get('query', '')}'" | |
| return ToolCallResponse(content=[{"type": "text", "text": response}]) | |
| elif name == "get_chunk": | |
| chunk_id = arguments.get("chunk_id", "") | |
| for chunk in chunks_data: | |
| if chunk.get('chunk_id') == chunk_id: | |
| response = f"**{chunk.get('title', 'Untitled')}**\n\n" | |
| response += f"{chunk.get('text', '')}\n\n" | |
| response += f"Source: {chunk.get('filename', 'Unknown')}\n" | |
| response += f"URL: {chunk.get('url', 'N/A')}" | |
| return ToolCallResponse(content=[{"type": "text", "text": response}]) | |
| return ToolCallResponse(content=[{"type": "text", "text": f"Chunk {chunk_id} not found"}]) | |
| elif name == "list_docs": | |
| if not docs_data: | |
| return ToolCallResponse(content=[{"type": "text", "text": "No documents available"}]) | |
| response = "Available documents:\n\n" | |
| for doc in docs_data: | |
| response += f"- **{doc.get('title', 'Untitled')}**\n" | |
| response += f" ID: {doc.get('id', 'Unknown')}\n" | |
| response += f" URL: {doc.get('url', 'N/A')}\n\n" | |
| return ToolCallResponse(content=[{"type": "text", "text": response}]) | |
| else: | |
| return ToolCallResponse(content=[{"type": "text", "text": f"Unknown tool: {name}"}]) | |
| async def list_resources(): | |
| """List available MCP resources""" | |
| if not docs_data: | |
| return [] | |
| resources = [] | |
| for doc in docs_data: | |
| resources.append({ | |
| "uri": f"mcp://docs/{doc.get('id', 'unknown')}", | |
| "name": doc.get('title', 'Untitled'), | |
| "description": doc.get('content', '')[:200] + "..." if len(doc.get('content', '')) > 200 else doc.get('content', ''), | |
| "mimeType": "text/plain" | |
| }) | |
| return resources | |
| async def read_resource(resource_id: str): | |
| """Read a specific MCP resource""" | |
| if not chunks_data: | |
| return "Data not loaded" | |
| # Find chunks for this document | |
| doc_chunks = [chunk for chunk in chunks_data if chunk.get('doc_id') == resource_id] | |
| if doc_chunks: | |
| # Combine all chunks for the document | |
| content = "\n\n".join([chunk.get('text', '') for chunk in doc_chunks]) | |
| return content | |
| else: | |
| return f"Document {resource_id} not found" | |
| # Legacy REST API endpoints for backward compatibility | |
| async def search_docs(request: SearchRequest): | |
| """Search through documentation chunks using text matching""" | |
| if not chunks_data: | |
| raise HTTPException(status_code=500, detail="Data not loaded") | |
| try: | |
| query_lower = request.query.lower() | |
| results = [] | |
| for chunk in chunks_data: | |
| text = chunk.get('text', '').lower() | |
| title = chunk.get('title', '').lower() | |
| # Simple scoring based on query matches | |
| score = 0 | |
| if query_lower in text: | |
| score += text.count(query_lower) * 2 # Text matches worth more | |
| if query_lower in title: | |
| score += title.count(query_lower) * 5 # Title matches worth most | |
| if score > 0: | |
| results.append({ | |
| "chunk_id": chunk.get('chunk_id'), | |
| "title": chunk.get('title'), | |
| "text": chunk.get('text'), | |
| "url": chunk.get('url'), | |
| "filename": chunk.get('filename'), | |
| "chunk_index": chunk.get('chunk_index'), | |
| "total_chunks": chunk.get('total_chunks'), | |
| "score": score | |
| }) | |
| # Sort by relevance score | |
| results = sorted(results, key=lambda x: x['score'], reverse=True) | |
| return SearchResponse( | |
| results=results[:request.limit], | |
| total=len(results) | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Search error: {str(e)}") | |
| async def get_chunk(chunk_id: str): | |
| """Get a specific chunk by ID""" | |
| if not chunks_data: | |
| raise HTTPException(status_code=500, detail="Data not loaded") | |
| for chunk in chunks_data: | |
| if chunk.get('chunk_id') == chunk_id: | |
| return chunk | |
| raise HTTPException(status_code=404, detail="Chunk not found") | |
| async def list_docs(): | |
| """List all available documents""" | |
| if not docs_data: | |
| raise HTTPException(status_code=500, detail="Data not loaded") | |
| return {"documents": docs_data} | |
| def is_initialize_request(body: dict) -> bool: | |
| """Check if request is an MCP initialize request""" | |
| return (body.get("jsonrpc") == "2.0" and | |
| body.get("method") == "initialize" and | |
| "id" in body) | |
| def create_mcp_server(): | |
| """Create MCP server instance""" | |
| return { | |
| "name": "mcp-docs-server", | |
| "version": "1.0.0", | |
| "capabilities": { | |
| "tools": {"listChanged": True}, | |
| "resources": {} | |
| } | |
| } | |
| async def mcp_post_handler(request: Request): | |
| """Handle MCP requests with proper session management""" | |
| try: | |
| body = await request.json() | |
| session_id = request.headers.get("mcp-session-id") | |
| # Handle initialization request | |
| if is_initialize_request(body): | |
| # Create new session | |
| new_session_id = str(uuid.uuid4()) | |
| sessions[new_session_id] = { | |
| "server": create_mcp_server(), | |
| "initialized": True | |
| } | |
| # Return initialization response | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "result": { | |
| "protocolVersion": "2025-06-18", | |
| "capabilities": { | |
| "tools": {"listChanged": True}, | |
| "resources": {} | |
| }, | |
| "serverInfo": { | |
| "name": "mcp-docs-server", | |
| "version": "1.0.0" | |
| } | |
| } | |
| } | |
| # Handle other requests - allow without session for MCP compatibility | |
| if session_id and session_id not in sessions: | |
| return { | |
| "jsonrpc": "2.0", | |
| "error": { | |
| "code": -32000, | |
| "message": "Bad Request: Invalid session ID provided" | |
| }, | |
| "id": body.get("id") | |
| } | |
| session = sessions.get(session_id) if session_id else None | |
| method = body.get("method") | |
| params = body.get("params", {}) | |
| request_id = body.get("id") | |
| # Handle tools/list | |
| if method == "tools/list": | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "tools": [ | |
| { | |
| "name": "search_docs", | |
| "title": "Search Documentation", | |
| "description": "Search through MCP documentation chunks", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Search query for MCP documentation" | |
| }, | |
| "limit": { | |
| "type": "integer", | |
| "description": "Maximum number of results", | |
| "default": 5 | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| }, | |
| { | |
| "name": "get_chunk", | |
| "title": "Get Documentation Chunk", | |
| "description": "Get a specific documentation chunk by ID", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "chunk_id": { | |
| "type": "string", | |
| "description": "Chunk ID to retrieve" | |
| } | |
| }, | |
| "required": ["chunk_id"] | |
| } | |
| }, | |
| { | |
| "name": "list_docs", | |
| "title": "List Documents", | |
| "description": "List all available documents", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": {} | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| # Handle tools/call | |
| elif method == "tools/call": | |
| tool_name = params.get("name") | |
| arguments = params.get("arguments", {}) | |
| if tool_name == "search_docs": | |
| query = arguments.get("query", "").lower() | |
| limit = arguments.get("limit", 5) | |
| results = [] | |
| for chunk in chunks_data or []: | |
| text = chunk.get('text', '').lower() | |
| title = chunk.get('title', '').lower() | |
| score = 0 | |
| if query in text: | |
| score += text.count(query) * 2 | |
| if query in title: | |
| score += title.count(query) * 5 | |
| if score > 0: | |
| results.append({ | |
| "chunk_id": chunk.get('chunk_id'), | |
| "title": chunk.get('title'), | |
| "text": chunk.get('text'), | |
| "url": chunk.get('url'), | |
| "filename": chunk.get('filename'), | |
| "score": score | |
| }) | |
| results = sorted(results, key=lambda x: x['score'], reverse=True)[:limit] | |
| if results: | |
| response_text = f"Found {len(results)} results for '{query}':\n\n" | |
| for i, result in enumerate(results, 1): | |
| response_text += f"{i}. **{result['title']}** (Score: {result['score']})\n" | |
| response_text += f" {result['text'][:200]}...\n" | |
| response_text += f" Source: {result['filename']}\n\n" | |
| else: | |
| response_text = f"No results found for '{query}'" | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": response_text | |
| } | |
| ] | |
| } | |
| } | |
| elif tool_name == "get_chunk": | |
| chunk_id = arguments.get("chunk_id", "") | |
| for chunk in chunks_data or []: | |
| if chunk.get('chunk_id') == chunk_id: | |
| response_text = f"**{chunk.get('title', 'Untitled')}**\n\n" | |
| response_text += f"{chunk.get('text', '')}\n\n" | |
| response_text += f"Source: {chunk.get('filename', 'Unknown')}\n" | |
| response_text += f"URL: {chunk.get('url', 'N/A')}" | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": response_text | |
| } | |
| ] | |
| } | |
| } | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": f"Chunk {chunk_id} not found" | |
| } | |
| ] | |
| } | |
| } | |
| elif tool_name == "list_docs": | |
| if not docs_data: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": "No documents available" | |
| } | |
| ] | |
| } | |
| } | |
| response_text = "Available documents:\n\n" | |
| for doc in docs_data: | |
| response_text += f"- **{doc.get('title', 'Untitled')}**\n" | |
| response_text += f" ID: {doc.get('id', 'Unknown')}\n" | |
| response_text += f" URL: {doc.get('url', 'N/A')}\n\n" | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "result": { | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": response_text | |
| } | |
| ] | |
| } | |
| } | |
| else: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "error": { | |
| "code": -32601, | |
| "message": f"Unknown tool: {tool_name}" | |
| } | |
| } | |
| else: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_id, | |
| "error": { | |
| "code": -32601, | |
| "message": f"Unknown method: {method}" | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": body.get("id") if 'body' in locals() else None, | |
| "error": { | |
| "code": -32603, | |
| "message": f"Internal error: {str(e)}" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |