mcp-docs-server / app.py
galcan's picture
Fix MCP session handling - allow requests without session ID
3a81b52
#!/usr/bin/env python3
"""
MCP Server for MCP Documentation
Hosted on Hugging Face Spaces with HTTP transport
"""
import json
import asyncio
import logging
import uuid
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="MCP Documentation Server", version="1.0.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global data storage
chunks_data = None
docs_data = None
# Session management for MCP
sessions = {}
def load_data():
"""Load the documentation chunks and metadata"""
global chunks_data, docs_data
try:
# Load chunks data
with open('mcp_docs/index/chunks_md.json', 'r', encoding='utf-8') as f:
chunks_data = json.load(f)
# Load docs data
with open('mcp_docs/index/docs_md.json', 'r', encoding='utf-8') as f:
docs_data = json.load(f)
logger.info(f"Loaded {len(chunks_data)} chunks and {len(docs_data)} documents")
except Exception as e:
logger.error(f"Error loading data: {e}")
raise
# Pydantic models
class SearchRequest(BaseModel):
query: str
limit: int = 5
class SearchResponse(BaseModel):
results: List[Dict[str, Any]]
total: int
class ToolCallRequest(BaseModel):
name: str
arguments: Dict[str, Any]
class ToolCallResponse(BaseModel):
content: List[Dict[str, str]]
@app.on_event("startup")
async def startup_event():
"""Load data on startup"""
load_data()
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"message": "MCP Documentation Server",
"status": "running",
"chunks_loaded": len(chunks_data) if chunks_data else 0,
"docs_loaded": len(docs_data) if docs_data else 0,
"mcp_server": True
}
@app.get("/mcp/info")
async def mcp_info():
"""MCP server information"""
return {
"name": "mcp-docs-server",
"version": "1.0.0",
"capabilities": {
"tools": True,
"resources": True
}
}
@app.get("/mcp/tools")
async def list_tools():
"""List available MCP tools"""
return [
{
"name": "search_docs",
"description": "Search through MCP documentation chunks",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 5
}
},
"required": ["query"]
}
},
{
"name": "get_chunk",
"description": "Get a specific documentation chunk by ID",
"inputSchema": {
"type": "object",
"properties": {
"chunk_id": {
"type": "string",
"description": "Chunk ID to retrieve"
}
},
"required": ["chunk_id"]
}
},
{
"name": "list_docs",
"description": "List all available documents",
"inputSchema": {
"type": "object",
"properties": {}
}
}
]
@app.post("/mcp/tools/call")
async def call_tool(request: ToolCallRequest):
"""Call an MCP tool"""
if not chunks_data:
raise HTTPException(status_code=500, detail="Data not loaded")
name = request.name
arguments = request.arguments
if name == "search_docs":
query = arguments.get("query", "").lower()
limit = arguments.get("limit", 5)
results = []
for chunk in chunks_data:
text = chunk.get('text', '').lower()
title = chunk.get('title', '').lower()
# Simple scoring
score = 0
if query in text:
score += text.count(query) * 2
if query in title:
score += title.count(query) * 5
if score > 0:
results.append({
"chunk_id": chunk.get('chunk_id'),
"title": chunk.get('title'),
"text": chunk.get('text'),
"url": chunk.get('url'),
"filename": chunk.get('filename'),
"score": score
})
# Sort by score and limit results
results = sorted(results, key=lambda x: x['score'], reverse=True)[:limit]
if results:
response = f"Found {len(results)} results for '{arguments.get('query', '')}':\n\n"
for i, result in enumerate(results, 1):
response += f"{i}. **{result['title']}** (Score: {result['score']})\n"
response += f" {result['text'][:200]}...\n"
response += f" Source: {result['filename']}\n\n"
else:
response = f"No results found for '{arguments.get('query', '')}'"
return ToolCallResponse(content=[{"type": "text", "text": response}])
elif name == "get_chunk":
chunk_id = arguments.get("chunk_id", "")
for chunk in chunks_data:
if chunk.get('chunk_id') == chunk_id:
response = f"**{chunk.get('title', 'Untitled')}**\n\n"
response += f"{chunk.get('text', '')}\n\n"
response += f"Source: {chunk.get('filename', 'Unknown')}\n"
response += f"URL: {chunk.get('url', 'N/A')}"
return ToolCallResponse(content=[{"type": "text", "text": response}])
return ToolCallResponse(content=[{"type": "text", "text": f"Chunk {chunk_id} not found"}])
elif name == "list_docs":
if not docs_data:
return ToolCallResponse(content=[{"type": "text", "text": "No documents available"}])
response = "Available documents:\n\n"
for doc in docs_data:
response += f"- **{doc.get('title', 'Untitled')}**\n"
response += f" ID: {doc.get('id', 'Unknown')}\n"
response += f" URL: {doc.get('url', 'N/A')}\n\n"
return ToolCallResponse(content=[{"type": "text", "text": response}])
else:
return ToolCallResponse(content=[{"type": "text", "text": f"Unknown tool: {name}"}])
@app.get("/mcp/resources")
async def list_resources():
"""List available MCP resources"""
if not docs_data:
return []
resources = []
for doc in docs_data:
resources.append({
"uri": f"mcp://docs/{doc.get('id', 'unknown')}",
"name": doc.get('title', 'Untitled'),
"description": doc.get('content', '')[:200] + "..." if len(doc.get('content', '')) > 200 else doc.get('content', ''),
"mimeType": "text/plain"
})
return resources
@app.get("/mcp/resources/{resource_id}")
async def read_resource(resource_id: str):
"""Read a specific MCP resource"""
if not chunks_data:
return "Data not loaded"
# Find chunks for this document
doc_chunks = [chunk for chunk in chunks_data if chunk.get('doc_id') == resource_id]
if doc_chunks:
# Combine all chunks for the document
content = "\n\n".join([chunk.get('text', '') for chunk in doc_chunks])
return content
else:
return f"Document {resource_id} not found"
# Legacy REST API endpoints for backward compatibility
@app.post("/search", response_model=SearchResponse)
async def search_docs(request: SearchRequest):
"""Search through documentation chunks using text matching"""
if not chunks_data:
raise HTTPException(status_code=500, detail="Data not loaded")
try:
query_lower = request.query.lower()
results = []
for chunk in chunks_data:
text = chunk.get('text', '').lower()
title = chunk.get('title', '').lower()
# Simple scoring based on query matches
score = 0
if query_lower in text:
score += text.count(query_lower) * 2 # Text matches worth more
if query_lower in title:
score += title.count(query_lower) * 5 # Title matches worth most
if score > 0:
results.append({
"chunk_id": chunk.get('chunk_id'),
"title": chunk.get('title'),
"text": chunk.get('text'),
"url": chunk.get('url'),
"filename": chunk.get('filename'),
"chunk_index": chunk.get('chunk_index'),
"total_chunks": chunk.get('total_chunks'),
"score": score
})
# Sort by relevance score
results = sorted(results, key=lambda x: x['score'], reverse=True)
return SearchResponse(
results=results[:request.limit],
total=len(results)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
@app.get("/chunks/{chunk_id}")
async def get_chunk(chunk_id: str):
"""Get a specific chunk by ID"""
if not chunks_data:
raise HTTPException(status_code=500, detail="Data not loaded")
for chunk in chunks_data:
if chunk.get('chunk_id') == chunk_id:
return chunk
raise HTTPException(status_code=404, detail="Chunk not found")
@app.get("/docs")
async def list_docs():
"""List all available documents"""
if not docs_data:
raise HTTPException(status_code=500, detail="Data not loaded")
return {"documents": docs_data}
def is_initialize_request(body: dict) -> bool:
"""Check if request is an MCP initialize request"""
return (body.get("jsonrpc") == "2.0" and
body.get("method") == "initialize" and
"id" in body)
def create_mcp_server():
"""Create MCP server instance"""
return {
"name": "mcp-docs-server",
"version": "1.0.0",
"capabilities": {
"tools": {"listChanged": True},
"resources": {}
}
}
@app.post("/mcp")
async def mcp_post_handler(request: Request):
"""Handle MCP requests with proper session management"""
try:
body = await request.json()
session_id = request.headers.get("mcp-session-id")
# Handle initialization request
if is_initialize_request(body):
# Create new session
new_session_id = str(uuid.uuid4())
sessions[new_session_id] = {
"server": create_mcp_server(),
"initialized": True
}
# Return initialization response
return {
"jsonrpc": "2.0",
"id": body.get("id"),
"result": {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {}
},
"serverInfo": {
"name": "mcp-docs-server",
"version": "1.0.0"
}
}
}
# Handle other requests - allow without session for MCP compatibility
if session_id and session_id not in sessions:
return {
"jsonrpc": "2.0",
"error": {
"code": -32000,
"message": "Bad Request: Invalid session ID provided"
},
"id": body.get("id")
}
session = sessions.get(session_id) if session_id else None
method = body.get("method")
params = body.get("params", {})
request_id = body.get("id")
# Handle tools/list
if method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": [
{
"name": "search_docs",
"title": "Search Documentation",
"description": "Search through MCP documentation chunks",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for MCP documentation"
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 5
}
},
"required": ["query"]
}
},
{
"name": "get_chunk",
"title": "Get Documentation Chunk",
"description": "Get a specific documentation chunk by ID",
"inputSchema": {
"type": "object",
"properties": {
"chunk_id": {
"type": "string",
"description": "Chunk ID to retrieve"
}
},
"required": ["chunk_id"]
}
},
{
"name": "list_docs",
"title": "List Documents",
"description": "List all available documents",
"inputSchema": {
"type": "object",
"properties": {}
}
}
]
}
}
# Handle tools/call
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "search_docs":
query = arguments.get("query", "").lower()
limit = arguments.get("limit", 5)
results = []
for chunk in chunks_data or []:
text = chunk.get('text', '').lower()
title = chunk.get('title', '').lower()
score = 0
if query in text:
score += text.count(query) * 2
if query in title:
score += title.count(query) * 5
if score > 0:
results.append({
"chunk_id": chunk.get('chunk_id'),
"title": chunk.get('title'),
"text": chunk.get('text'),
"url": chunk.get('url'),
"filename": chunk.get('filename'),
"score": score
})
results = sorted(results, key=lambda x: x['score'], reverse=True)[:limit]
if results:
response_text = f"Found {len(results)} results for '{query}':\n\n"
for i, result in enumerate(results, 1):
response_text += f"{i}. **{result['title']}** (Score: {result['score']})\n"
response_text += f" {result['text'][:200]}...\n"
response_text += f" Source: {result['filename']}\n\n"
else:
response_text = f"No results found for '{query}'"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": response_text
}
]
}
}
elif tool_name == "get_chunk":
chunk_id = arguments.get("chunk_id", "")
for chunk in chunks_data or []:
if chunk.get('chunk_id') == chunk_id:
response_text = f"**{chunk.get('title', 'Untitled')}**\n\n"
response_text += f"{chunk.get('text', '')}\n\n"
response_text += f"Source: {chunk.get('filename', 'Unknown')}\n"
response_text += f"URL: {chunk.get('url', 'N/A')}"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": response_text
}
]
}
}
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": f"Chunk {chunk_id} not found"
}
]
}
}
elif tool_name == "list_docs":
if not docs_data:
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": "No documents available"
}
]
}
}
response_text = "Available documents:\n\n"
for doc in docs_data:
response_text += f"- **{doc.get('title', 'Untitled')}**\n"
response_text += f" ID: {doc.get('id', 'Unknown')}\n"
response_text += f" URL: {doc.get('url', 'N/A')}\n\n"
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": response_text
}
]
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown tool: {tool_name}"
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown method: {method}"
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": body.get("id") if 'body' in locals() else None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)