Spaces:
Sleeping
Sleeping
| # services/mcp_server.py | |
| """ | |
| Model Context Protocol (MCP) server for MasterLLM. | |
| Exposes CrewAI tools via standardized MCP protocol for external integration. | |
| """ | |
| import json | |
| import os | |
| from typing import Any, Dict, List, Optional | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource | |
| from mcp.server.stdio import stdio_server | |
| # Import CrewAI tools | |
| from services.agent_crewai import ( | |
| ExtractTextTool, | |
| ExtractTablesTool, | |
| DescribeImagesTool, | |
| SummarizeTextTool, | |
| ClassifyTextTool, | |
| ExtractEntitesTool, | |
| TranslateTextTool, | |
| SignatureVerificationTool, | |
| StampDetectionTool, | |
| get_master_tools, | |
| run_agent, | |
| ) | |
| # ======================== | |
| # MCP SERVER SETUP | |
| # ======================== | |
| class MasterLLMMCPServer: | |
| """MCP Server for MasterLLM document processing tools.""" | |
| def __init__(self, name: str = "masterllm-orchestrator"): | |
| self.server = Server(name) | |
| self.tools = get_master_tools() | |
| self._setup_handlers() | |
| def _setup_handlers(self): | |
| """Register MCP protocol handlers.""" | |
| async def list_tools() -> List[Tool]: | |
| """List all available tools exposed via MCP.""" | |
| mcp_tools = [] | |
| for tool in self.tools: | |
| # Convert CrewAI tool to MCP tool format | |
| mcp_tool = Tool( | |
| name=tool.name, | |
| description=tool.description, | |
| inputSchema={ | |
| "type": "object", | |
| "properties": self._get_tool_schema(tool.name), | |
| "required": self._get_required_fields(tool.name), | |
| } | |
| ) | |
| mcp_tools.append(mcp_tool) | |
| return mcp_tools | |
| async def call_tool(name: str, arguments: dict) -> List[TextContent]: | |
| """Execute a tool and return results.""" | |
| # Find the matching CrewAI tool | |
| matching_tool = None | |
| for tool in self.tools: | |
| if tool.name == name: | |
| matching_tool = tool | |
| break | |
| if not matching_tool: | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps({"error": f"Tool '{name}' not found"}) | |
| )] | |
| try: | |
| # Execute the CrewAI tool | |
| result = matching_tool._run(**arguments) | |
| # Parse result if it's a JSON string | |
| if isinstance(result, str): | |
| try: | |
| result = json.loads(result) | |
| except json.JSONDecodeError: | |
| pass | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps(result, indent=2) | |
| )] | |
| except Exception as e: | |
| return [TextContent( | |
| type="text", | |
| text=json.dumps({ | |
| "error": str(e), | |
| "tool": name, | |
| "arguments": arguments | |
| }) | |
| )] | |
| async def list_resources() -> List[Any]: | |
| """List available resources (e.g., workflow templates, history).""" | |
| # Can be extended to expose MongoDB records, S3 files, etc. | |
| return [ | |
| { | |
| "uri": "workflow://templates", | |
| "name": "Workflow Templates", | |
| "description": "Pre-configured document processing workflows", | |
| "mimeType": "application/json" | |
| }, | |
| { | |
| "uri": "workflow://history", | |
| "name": "Execution History", | |
| "description": "Recent workflow execution history", | |
| "mimeType": "application/json" | |
| } | |
| ] | |
| async def read_resource(uri: str) -> str: | |
| """Read a specific resource.""" | |
| if uri == "workflow://templates": | |
| templates = { | |
| "document_analysis": { | |
| "pipeline": "text-table-summarize", | |
| "description": "Extract text and tables, then summarize" | |
| }, | |
| "multilingual_processing": { | |
| "pipeline": "text-translate-summarize", | |
| "description": "Extract, translate, and summarize document" | |
| }, | |
| "verification": { | |
| "pipeline": "signature_verification-stamp_detection", | |
| "description": "Verify signatures and detect stamps" | |
| } | |
| } | |
| return json.dumps(templates, indent=2) | |
| elif uri == "workflow://history": | |
| # This could query MongoDB for recent executions | |
| # For now, return placeholder | |
| return json.dumps({ | |
| "message": "Connect to MongoDB to view execution history", | |
| "recent_workflows": [] | |
| }, indent=2) | |
| return json.dumps({"error": f"Resource not found: {uri}"}) | |
| async def list_prompts() -> List[Any]: | |
| """List available prompt templates.""" | |
| return [ | |
| { | |
| "name": "analyze_document", | |
| "description": "Comprehensive document analysis workflow", | |
| "arguments": [ | |
| { | |
| "name": "file_path", | |
| "description": "Path to the document file", | |
| "required": True | |
| }, | |
| { | |
| "name": "analysis_depth", | |
| "description": "Level of analysis: basic, standard, or comprehensive", | |
| "required": False | |
| } | |
| ] | |
| }, | |
| { | |
| "name": "extract_and_summarize", | |
| "description": "Extract content and generate summary", | |
| "arguments": [ | |
| { | |
| "name": "file_path", | |
| "description": "Path to the document file", | |
| "required": True | |
| }, | |
| { | |
| "name": "include_tables", | |
| "description": "Whether to include tables in summary", | |
| "required": False | |
| } | |
| ] | |
| } | |
| ] | |
| async def get_prompt(name: str, arguments: dict) -> Any: | |
| """Get a specific prompt with filled arguments.""" | |
| if name == "analyze_document": | |
| file_path = arguments.get("file_path", "") | |
| depth = arguments.get("analysis_depth", "standard") | |
| if depth == "comprehensive": | |
| instruction = f"Perform comprehensive analysis on {file_path}: extract text, tables, describe images, classify content, extract entities, verify signatures, and detect stamps. Then provide a detailed summary." | |
| elif depth == "basic": | |
| instruction = f"Perform basic analysis on {file_path}: extract text and provide a brief summary." | |
| else: # standard | |
| instruction = f"Analyze {file_path}: extract text and tables, then provide a summary of the content." | |
| return { | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": { | |
| "type": "text", | |
| "text": instruction | |
| } | |
| } | |
| ] | |
| } | |
| elif name == "extract_and_summarize": | |
| file_path = arguments.get("file_path", "") | |
| include_tables = arguments.get("include_tables", "true").lower() == "true" | |
| if include_tables: | |
| instruction = f"Extract text and tables from {file_path}, then create a comprehensive summary including the table data." | |
| else: | |
| instruction = f"Extract text from {file_path} and create a summary." | |
| return { | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": { | |
| "type": "text", | |
| "text": instruction | |
| } | |
| } | |
| ] | |
| } | |
| return {"error": f"Prompt not found: {name}"} | |
| def _get_tool_schema(self, tool_name: str) -> Dict[str, Any]: | |
| """Get JSON schema for tool parameters.""" | |
| base_file_schema = { | |
| "file_path": { | |
| "type": "string", | |
| "description": "Absolute or relative path to the file" | |
| }, | |
| "start_page": { | |
| "type": "integer", | |
| "description": "Start page (1-indexed)", | |
| "default": 1 | |
| }, | |
| "end_page": { | |
| "type": "integer", | |
| "description": "End page (inclusive, 1-indexed)", | |
| "default": 1 | |
| } | |
| } | |
| text_or_file_schema = { | |
| "text": { | |
| "type": "string", | |
| "description": "Raw text to process (alternative to file_path)" | |
| }, | |
| "file_path": { | |
| "type": "string", | |
| "description": "Path to document file (alternative to text)" | |
| }, | |
| "start_page": { | |
| "type": "integer", | |
| "description": "Start page for file processing", | |
| "default": 1 | |
| }, | |
| "end_page": { | |
| "type": "integer", | |
| "description": "End page for file processing", | |
| "default": 1 | |
| } | |
| } | |
| schemas = { | |
| "extract_text": base_file_schema, | |
| "extract_tables": base_file_schema, | |
| "describe_images": base_file_schema, | |
| "summarize_text": text_or_file_schema, | |
| "classify_text": text_or_file_schema, | |
| "extract_entities": text_or_file_schema, | |
| "translate_text": { | |
| **text_or_file_schema, | |
| "target_lang": { | |
| "type": "string", | |
| "description": "Target language code (e.g., 'es', 'fr', 'de') or name (e.g., 'Spanish')" | |
| } | |
| }, | |
| "signature_verification": base_file_schema, | |
| "stamp_detection": base_file_schema, | |
| } | |
| return schemas.get(tool_name, {}) | |
| def _get_required_fields(self, tool_name: str) -> List[str]: | |
| """Get required fields for each tool.""" | |
| file_based_tools = [ | |
| "extract_text", | |
| "extract_tables", | |
| "describe_images", | |
| "signature_verification", | |
| "stamp_detection" | |
| ] | |
| if tool_name in file_based_tools: | |
| return ["file_path"] | |
| elif tool_name == "translate_text": | |
| return ["target_lang"] | |
| else: | |
| return [] # text or file_path required, but either is acceptable | |
| async def run(self): | |
| """Run the MCP server using stdio transport.""" | |
| async with stdio_server() as (read_stream, write_stream): | |
| await self.server.run( | |
| read_stream, | |
| write_stream, | |
| self.server.create_initialization_options() | |
| ) | |
| # ======================== | |
| # FASTAPI INTEGRATION | |
| # ======================== | |
| def create_mcp_fastapi_routes(app): | |
| """ | |
| Add MCP SSE (Server-Sent Events) endpoints to FastAPI app. | |
| This allows MCP clients to connect via HTTP instead of stdio. | |
| """ | |
| from mcp.server.sse import SseServerTransport | |
| from fastapi import Request | |
| from fastapi.responses import StreamingResponse | |
| from sse_starlette import EventSourceResponse | |
| mcp_server = MasterLLMMCPServer() | |
| async def mcp_sse_endpoint(request: Request): | |
| """SSE endpoint for MCP protocol.""" | |
| from mcp.server.sse import sse_transport | |
| async def event_generator(): | |
| async with sse_transport() as (read_stream, write_stream): | |
| await mcp_server.server.run( | |
| read_stream, | |
| write_stream, | |
| mcp_server.server.create_initialization_options() | |
| ) | |
| return EventSourceResponse(event_generator()) | |
| async def mcp_post_endpoint(request: Request): | |
| """POST endpoint for MCP messages (alternative to SSE).""" | |
| data = await request.json() | |
| # Handle MCP JSON-RPC requests | |
| method = data.get("method") | |
| params = data.get("params", {}) | |
| if method == "tools/list": | |
| tools = await mcp_server.server._tool_list_handler() | |
| return {"jsonrpc": "2.0", "result": tools, "id": data.get("id")} | |
| elif method == "tools/call": | |
| name = params.get("name") | |
| arguments = params.get("arguments", {}) | |
| result = await mcp_server.server._tool_call_handler(name, arguments) | |
| return {"jsonrpc": "2.0", "result": result, "id": data.get("id")} | |
| return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": data.get("id")} | |
| # ======================== | |
| # STANDALONE SERVER | |
| # ======================== | |
| async def main(): | |
| """Run MCP server in standalone mode (stdio transport).""" | |
| server = MasterLLMMCPServer() | |
| await server.run() | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(main()) | |