FastAPI-Backend-Models / routes /mcp_routes.py
malek-messaoudii
Enhance MCP tool performance by implementing asynchronous handling in tool calls
997d57e
raw
history blame
18.7 kB
"""Routes pour exposer MCP via FastAPI pour Swagger UI"""
from fastapi import APIRouter, HTTPException
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field
import logging
import json
from services.mcp_service import mcp_server
from models.mcp_models import ToolListResponse, ToolInfo, ToolCallRequest, ToolCallResponse
router = APIRouter(prefix="/api/v1/mcp", tags=["MCP"])
logger = logging.getLogger(__name__)
# ===== Models pour chaque outil MCP =====
class DetectStanceRequest(BaseModel):
"""Request pour détecter la stance d'un argument"""
topic: str = Field(..., description="Le sujet du débat")
argument: str = Field(..., description="L'argument à analyser")
class Config:
json_schema_extra = {
"example": {
"topic": "Climate change is real",
"argument": "Rising global temperatures prove it"
}
}
class MatchKeypointRequest(BaseModel):
"""Request pour matcher un argument avec un keypoint"""
argument: str = Field(..., description="L'argument à évaluer")
key_point: str = Field(..., description="Le keypoint de référence")
class Config:
json_schema_extra = {
"example": {
"argument": "Renewable energy reduces emissions",
"key_point": "Environmental benefits"
}
}
class TranscribeAudioRequest(BaseModel):
"""Request pour transcrire un audio"""
audio_path: str = Field(..., description="Chemin vers le fichier audio")
class Config:
json_schema_extra = {
"example": {
"audio_path": "/path/to/audio.wav"
}
}
class GenerateSpeechRequest(BaseModel):
"""Request pour générer de la parole"""
text: str = Field(..., description="Texte à convertir en parole")
voice: str = Field(default="Aaliyah-PlayAI", description="Voix à utiliser")
format: str = Field(default="wav", description="Format audio (wav, mp3, etc.)")
class Config:
json_schema_extra = {
"example": {
"text": "Hello, this is a test",
"voice": "Aaliyah-PlayAI",
"format": "wav"
}
}
class GenerateArgumentRequest(BaseModel):
"""Request pour générer un argument"""
user_input: str = Field(..., description="Input utilisateur pour générer l'argument")
conversation_id: Optional[str] = Field(default=None, description="ID de conversation (optionnel)")
class Config:
json_schema_extra = {
"example": {
"user_input": "Generate an argument about climate change",
"conversation_id": "conv_123"
}
}
# ===== Routes MCP =====
@router.get("/health", summary="Health Check MCP")
async def mcp_health():
"""Health check pour le serveur MCP"""
try:
# Liste hardcodée des outils disponibles (plus fiable)
tool_names = [
"detect_stance",
"match_keypoint_argument",
"transcribe_audio",
"generate_speech",
"generate_argument",
"health_check"
]
return {
"status": "healthy",
"tools": tool_names,
"tool_count": len(tool_names)
}
except Exception as e:
logger.error(f"MCP health check error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/tools", response_model=ToolListResponse, summary="Liste des outils MCP")
async def list_mcp_tools():
"""Liste tous les outils MCP disponibles"""
try:
# Définir manuellement les outils avec leurs schémas
tool_list = [
ToolInfo(
name="detect_stance",
description="Détecte si un argument est PRO ou CON pour un topic donné",
input_schema={
"type": "object",
"properties": {
"topic": {"type": "string", "description": "Le sujet du débat"},
"argument": {"type": "string", "description": "L'argument à analyser"}
},
"required": ["topic", "argument"]
}
),
ToolInfo(
name="match_keypoint_argument",
description="Détermine si un argument correspond à un keypoint",
input_schema={
"type": "object",
"properties": {
"argument": {"type": "string", "description": "L'argument à évaluer"},
"key_point": {"type": "string", "description": "Le keypoint de référence"}
},
"required": ["argument", "key_point"]
}
),
ToolInfo(
name="transcribe_audio",
description="Convertit un fichier audio en texte",
input_schema={
"type": "object",
"properties": {
"audio_path": {"type": "string", "description": "Chemin vers le fichier audio"}
},
"required": ["audio_path"]
}
),
ToolInfo(
name="generate_speech",
description="Convertit du texte en fichier audio",
input_schema={
"type": "object",
"properties": {
"text": {"type": "string", "description": "Texte à convertir en parole"},
"voice": {"type": "string", "description": "Voix à utiliser", "default": "Aaliyah-PlayAI"},
"format": {"type": "string", "description": "Format audio", "default": "wav"}
},
"required": ["text"]
}
),
ToolInfo(
name="generate_argument",
description="Génère un argument de débat à partir d'un input utilisateur",
input_schema={
"type": "object",
"properties": {
"user_input": {"type": "string", "description": "Input utilisateur pour générer l'argument"},
"conversation_id": {"type": "string", "description": "ID de conversation (optionnel)"}
},
"required": ["user_input"]
}
),
ToolInfo(
name="health_check",
description="Health check pour le serveur MCP",
input_schema={
"type": "object",
"properties": {},
"required": []
}
)
]
return ToolListResponse(tools=tool_list, count=len(tool_list))
except Exception as e:
logger.error(f"Error listing MCP tools: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/tools/call", response_model=ToolCallResponse, summary="Appeler un outil MCP")
async def call_mcp_tool(request: ToolCallRequest):
"""Appelle un outil MCP par son nom avec des arguments"""
try:
result = await mcp_server.call_tool(request.tool_name, request.arguments)
# Gérer différents types de retours MCP
if isinstance(result, dict):
# Si le résultat contient une clé "result" avec une liste de ContentBlock
if "result" in result and isinstance(result["result"], list) and len(result["result"]) > 0:
content_block = result["result"][0]
if hasattr(content_block, 'text') and content_block.text:
try:
final_result = json.loads(content_block.text)
except json.JSONDecodeError:
final_result = {"text": content_block.text}
else:
final_result = result
else:
final_result = result
elif isinstance(result, (list, tuple)) and len(result) > 0:
# Si c'est une liste de ContentBlock, extraire le contenu
if hasattr(result[0], 'text') and result[0].text:
try:
final_result = json.loads(result[0].text)
except json.JSONDecodeError:
final_result = {"text": result[0].text}
else:
final_result = {"result": result[0] if result else {}}
else:
final_result = {"result": result}
return ToolCallResponse(
success=True,
result=final_result,
tool_name=request.tool_name
)
except Exception as e:
logger.error(f"Error calling MCP tool {request.tool_name}: {e}")
return ToolCallResponse(
success=False,
error=str(e),
tool_name=request.tool_name
)
# ===== Routes individuelles pour chaque outil (pour Swagger) =====
@router.post("/tools/detect-stance", summary="Détecter la stance d'un argument")
async def mcp_detect_stance(request: DetectStanceRequest) -> Dict[str, Any]:
"""Détecte si un argument est PRO ou CON pour un topic donné"""
try:
# Appeler directement via call_tool (async)
result = await mcp_server.call_tool("detect_stance", {
"topic": request.topic,
"argument": request.argument
})
# Gérer différents types de retours MCP
if isinstance(result, dict):
# Si le résultat contient une clé "result" avec une liste de ContentBlock
if "result" in result and isinstance(result["result"], list) and len(result["result"]) > 0:
content_block = result["result"][0]
if hasattr(content_block, 'text') and content_block.text:
try:
return json.loads(content_block.text)
except json.JSONDecodeError:
return {"text": content_block.text}
return result
elif isinstance(result, (list, tuple)) and len(result) > 0:
# Si c'est une liste de ContentBlock, extraire le contenu
if hasattr(result[0], 'text') and result[0].text:
try:
return json.loads(result[0].text)
except json.JSONDecodeError:
return {"text": result[0].text}
return {"result": result[0] if result else {}}
return {"result": result}
except Exception as e:
logger.error(f"Error in detect_stance: {e}")
raise HTTPException(status_code=500, detail=f"Error executing tool detect_stance: {e}")
@router.post("/tools/match-keypoint", summary="Matcher un argument avec un keypoint")
async def mcp_match_keypoint(request: MatchKeypointRequest) -> Dict[str, Any]:
"""Détermine si un argument correspond à un keypoint"""
try:
result = await mcp_server.call_tool("match_keypoint_argument", {
"argument": request.argument,
"key_point": request.key_point
})
# Gérer différents types de retours MCP
if isinstance(result, dict):
# Si le résultat contient une clé "result" avec une liste de ContentBlock
if "result" in result and isinstance(result["result"], list) and len(result["result"]) > 0:
content_block = result["result"][0]
if hasattr(content_block, 'text') and content_block.text:
try:
return json.loads(content_block.text)
except json.JSONDecodeError:
return {"text": content_block.text}
return result
elif isinstance(result, (list, tuple)) and len(result) > 0:
if hasattr(result[0], 'text') and result[0].text:
try:
return json.loads(result[0].text)
except json.JSONDecodeError:
return {"text": result[0].text}
return {"result": result[0] if result else {}}
return {"result": result}
except Exception as e:
logger.error(f"Error in match_keypoint_argument: {e}")
raise HTTPException(status_code=500, detail=f"Error executing tool match_keypoint_argument: {e}")
@router.post("/tools/transcribe-audio", summary="Transcrire un audio en texte")
async def mcp_transcribe_audio(request: TranscribeAudioRequest) -> Dict[str, str]:
"""Convertit un fichier audio en texte"""
try:
result = await mcp_server.call_tool("transcribe_audio", {
"audio_path": request.audio_path
})
# Gérer différents types de retours MCP
if isinstance(result, dict):
# Si le résultat contient une clé "result" avec une liste de ContentBlock
if "result" in result and isinstance(result["result"], list) and len(result["result"]) > 0:
content_block = result["result"][0]
if hasattr(content_block, 'text'):
return {"text": content_block.text}
# Si c'est un dict simple, essayer d'extraire le texte
if "text" in result:
return {"text": result["text"]}
return {"text": str(result)}
elif isinstance(result, str):
return {"text": result}
elif isinstance(result, (list, tuple)) and len(result) > 0:
if hasattr(result[0], 'text'):
return {"text": result[0].text}
return {"text": str(result[0])}
return {"text": str(result)}
except FileNotFoundError as e:
logger.error(f"File not found in transcribe_audio: {e}")
raise HTTPException(status_code=500, detail=f"Error executing tool transcribe_audio: {e}")
except Exception as e:
logger.error(f"Error in transcribe_audio: {e}")
raise HTTPException(status_code=500, detail=f"Error executing tool transcribe_audio: {e}")
@router.post("/tools/generate-speech", summary="Générer de la parole à partir de texte")
async def mcp_generate_speech(request: GenerateSpeechRequest) -> Dict[str, str]:
"""Convertit du texte en fichier audio"""
try:
result = await mcp_server.call_tool("generate_speech", {
"text": request.text,
"voice": request.voice,
"format": request.format
})
# Gérer différents types de retours MCP
if isinstance(result, dict):
# Si le résultat contient une clé "result" avec une liste de ContentBlock
if "result" in result and isinstance(result["result"], list) and len(result["result"]) > 0:
content_block = result["result"][0]
if hasattr(content_block, 'text'):
return {"audio_path": content_block.text}
# Si c'est un dict simple, essayer d'extraire le chemin
if "audio_path" in result:
return {"audio_path": result["audio_path"]}
return {"audio_path": str(result)}
elif isinstance(result, str):
return {"audio_path": result}
elif isinstance(result, (list, tuple)) and len(result) > 0:
if hasattr(result[0], 'text'):
return {"audio_path": result[0].text}
return {"audio_path": str(result[0])}
return {"audio_path": str(result)}
except Exception as e:
logger.error(f"Error in generate_speech: {e}")
raise HTTPException(status_code=500, detail=f"Error executing tool generate_speech: {e}")
@router.post("/tools/generate-argument", summary="Générer un argument de débat")
async def mcp_generate_argument(request: GenerateArgumentRequest) -> Dict[str, str]:
"""Génère un argument de débat à partir d'un input utilisateur"""
try:
result = await mcp_server.call_tool("generate_argument", {
"user_input": request.user_input,
"conversation_id": request.conversation_id
})
# Gérer différents types de retours MCP
if isinstance(result, dict):
# Si le résultat contient une clé "result" avec une liste de ContentBlock
if "result" in result and isinstance(result["result"], list) and len(result["result"]) > 0:
content_block = result["result"][0]
if hasattr(content_block, 'text'):
return {"argument": content_block.text}
# Si c'est un dict simple, essayer d'extraire l'argument
if "argument" in result:
return {"argument": result["argument"]}
return {"argument": str(result)}
elif isinstance(result, str):
return {"argument": result}
elif isinstance(result, (list, tuple)) and len(result) > 0:
if hasattr(result[0], 'text'):
return {"argument": result[0].text}
return {"argument": str(result[0])}
return {"argument": str(result)}
except Exception as e:
logger.error(f"Error in generate_argument: {e}")
raise HTTPException(status_code=500, detail=f"Error executing tool generate_argument: {e}")
@router.get("/tools/health-check", summary="Health check MCP (outil)")
async def mcp_tool_health_check() -> Dict[str, Any]:
"""Health check via l'outil MCP"""
try:
result = await mcp_server.call_tool("health_check", {})
# Gérer différents types de retours MCP
import json
if isinstance(result, dict):
# Si le résultat contient une clé "result" avec une liste de ContentBlock
if "result" in result and isinstance(result["result"], list) and len(result["result"]) > 0:
content_block = result["result"][0]
if hasattr(content_block, 'text') and content_block.text:
try:
return json.loads(content_block.text)
except json.JSONDecodeError:
return {"text": content_block.text}
return result
elif isinstance(result, (list, tuple)) and len(result) > 0:
if hasattr(result[0], 'text') and result[0].text:
try:
return json.loads(result[0].text)
except json.JSONDecodeError:
return {"text": result[0].text}
return {"result": result[0] if result else {}}
return {"result": result}
except Exception as e:
logger.error(f"Error in health_check tool: {e}")
raise HTTPException(status_code=500, detail=f"Error executing tool health_check: {e}")