from typing import Any, List, Optional import json from fastapi import FastAPI, HTTPException from mcp import Server, Resource, Tool from mcp.types import TextContent, ImageContent import logging from .resources import get_resources logger = logging.getLogger(__name__) class MCPServer: def __init__(self, app: FastAPI): self.app = app self.server = Server() self._setup_resources() self._setup_tools() def _setup_resources(self): """Définir les ressources exposées via MCP""" resources = get_resources() for resource in resources: self.server.add_resource(resource) def _setup_tools(self): """Définir les outils exposés via MCP""" from .tools import get_tools tools = get_tools() for tool in tools: self.server.add_tool(tool) async def list_resources(self) -> List[dict]: """Lister toutes les ressources disponibles""" return [ { "uri": resource.uri, "name": resource.name, "description": resource.description, "mime_type": resource.mime_type } for resource in self.server.resources.values() ] async def list_tools(self) -> List[dict]: """Lister tous les outils disponibles""" return [ { "name": tool.name, "description": tool.description, "input_schema": tool.input_schema } for tool in self.server.tools.values() ] async def call_tool(self, tool_name: str, arguments: dict) -> Any: """Appeler un outil MCP""" if tool_name not in self.server.tools: raise HTTPException(status_code=404, detail=f"Tool {tool_name} not found") try: tool = self.server.tools[tool_name] result = await tool.execute(arguments) return result except Exception as e: logger.error(f"Tool execution error: {str(e)}") raise HTTPException(status_code=500, detail=f"Tool execution failed: {str(e)}")