Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| MCP Server for MCP Documentation | |
| Hosted on Hugging Face Spaces with HTTP transport | |
| """ | |
| import json | |
| import asyncio | |
| import logging | |
| from typing import Any, Dict, List, Optional | |
| from mcp.server import Server | |
| from mcp.server.models import InitializationOptions | |
| from mcp.server.sse import sse_server | |
| from mcp.types import ( | |
| Resource, | |
| Tool, | |
| TextContent, | |
| LoggingLevel | |
| ) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global data storage | |
| chunks_data = None | |
| docs_data = None | |
| def load_data(): | |
| """Load the documentation chunks and metadata""" | |
| global chunks_data, docs_data | |
| try: | |
| # Load chunks data | |
| with open('mcp_docs/index/chunks_md.json', 'r', encoding='utf-8') as f: | |
| chunks_data = json.load(f) | |
| # Load docs data | |
| with open('mcp_docs/index/docs_md.json', 'r', encoding='utf-8') as f: | |
| docs_data = json.load(f) | |
| logger.info(f"Loaded {len(chunks_data)} chunks and {len(docs_data)} documents") | |
| except Exception as e: | |
| logger.error(f"Error loading data: {e}") | |
| raise | |
| # Initialize the MCP server | |
| server = Server("mcp-docs-server") | |
| async def list_resources() -> List[Resource]: | |
| """List available documentation resources""" | |
| if not docs_data: | |
| return [] | |
| resources = [] | |
| for doc in docs_data: | |
| resources.append(Resource( | |
| uri=f"mcp://docs/{doc.get('id', 'unknown')}", | |
| name=doc.get('title', 'Untitled'), | |
| description=doc.get('content', '')[:200] + "..." if len(doc.get('content', '')) > 200 else doc.get('content', ''), | |
| mimeType="text/plain" | |
| )) | |
| return resources | |
| async def read_resource(uri: str) -> str: | |
| """Read a specific documentation resource""" | |
| if not chunks_data: | |
| return "Data not loaded" | |
| # Extract document ID from URI | |
| if uri.startswith("mcp://docs/"): | |
| doc_id = uri.replace("mcp://docs/", "") | |
| # Find chunks for this document | |
| doc_chunks = [chunk for chunk in chunks_data if chunk.get('doc_id') == doc_id] | |
| if doc_chunks: | |
| # Combine all chunks for the document | |
| content = "\n\n".join([chunk.get('text', '') for chunk in doc_chunks]) | |
| return content | |
| else: | |
| return f"Document {doc_id} not found" | |
| return "Invalid URI" | |
| async def list_tools() -> List[Tool]: | |
| """List available tools""" | |
| return [ | |
| Tool( | |
| name="search_docs", | |
| description="Search through MCP documentation chunks", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Search query" | |
| }, | |
| "limit": { | |
| "type": "integer", | |
| "description": "Maximum number of results", | |
| "default": 5 | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| ), | |
| Tool( | |
| name="get_chunk", | |
| description="Get a specific documentation chunk by ID", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "chunk_id": { | |
| "type": "string", | |
| "description": "Chunk ID to retrieve" | |
| } | |
| }, | |
| "required": ["chunk_id"] | |
| } | |
| ), | |
| Tool( | |
| name="list_docs", | |
| description="List all available documents", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": {} | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: | |
| """Handle tool calls""" | |
| if not chunks_data: | |
| return [TextContent(type="text", text="Data not loaded")] | |
| if name == "search_docs": | |
| query = arguments.get("query", "").lower() | |
| limit = arguments.get("limit", 5) | |
| results = [] | |
| for chunk in chunks_data: | |
| text = chunk.get('text', '').lower() | |
| title = chunk.get('title', '').lower() | |
| # Simple scoring | |
| score = 0 | |
| if query in text: | |
| score += text.count(query) * 2 | |
| if query in title: | |
| score += title.count(query) * 5 | |
| if score > 0: | |
| results.append({ | |
| "chunk_id": chunk.get('chunk_id'), | |
| "title": chunk.get('title'), | |
| "text": chunk.get('text'), | |
| "url": chunk.get('url'), | |
| "filename": chunk.get('filename'), | |
| "score": score | |
| }) | |
| # Sort by score and limit results | |
| results = sorted(results, key=lambda x: x['score'], reverse=True)[:limit] | |
| if results: | |
| response = f"Found {len(results)} results for '{arguments.get('query', '')}':\n\n" | |
| for i, result in enumerate(results, 1): | |
| response += f"{i}. **{result['title']}** (Score: {result['score']})\n" | |
| response += f" {result['text'][:200]}...\n" | |
| response += f" Source: {result['filename']}\n\n" | |
| else: | |
| response = f"No results found for '{arguments.get('query', '')}'" | |
| return [TextContent(type="text", text=response)] | |
| elif name == "get_chunk": | |
| chunk_id = arguments.get("chunk_id", "") | |
| for chunk in chunks_data: | |
| if chunk.get('chunk_id') == chunk_id: | |
| response = f"**{chunk.get('title', 'Untitled')}**\n\n" | |
| response += f"{chunk.get('text', '')}\n\n" | |
| response += f"Source: {chunk.get('filename', 'Unknown')}\n" | |
| response += f"URL: {chunk.get('url', 'N/A')}" | |
| return [TextContent(type="text", text=response)] | |
| return [TextContent(type="text", text=f"Chunk {chunk_id} not found")] | |
| elif name == "list_docs": | |
| if not docs_data: | |
| return [TextContent(type="text", text="No documents available")] | |
| response = "Available documents:\n\n" | |
| for doc in docs_data: | |
| response += f"- **{doc.get('title', 'Untitled')}**\n" | |
| response += f" ID: {doc.get('id', 'Unknown')}\n" | |
| response += f" URL: {doc.get('url', 'N/A')}\n\n" | |
| return [TextContent(type="text", text=response)] | |
| else: | |
| return [TextContent(type="text", text=f"Unknown tool: {name}")] | |
| async def main(): | |
| """Main entry point""" | |
| # Load data | |
| load_data() | |
| # Run the server with SSE transport for HTTP access | |
| async with sse_server() as (read_stream, write_stream): | |
| await server.run( | |
| read_stream, | |
| write_stream, | |
| InitializationOptions( | |
| server_name="mcp-docs-server", | |
| server_version="1.0.0", | |
| capabilities=server.get_capabilities( | |
| notification_options=None, | |
| experimental_capabilities=None | |
| ) | |
| ) | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |