| |
| import asyncio |
| from typing import List, Dict, Optional, Generator, AsyncGenerator |
| from fastmcp import Client |
| from fastmcp.client.transports import SSETransport |
| from llama_index.core.workflow import Context |
| from llama_index.tools.mcp import McpToolSpec |
| from llama_index.core.agent.workflow import FunctionAgent, ToolCall, ToolCallResult |
| from mcp.client.sse import sse_client as original_sse_client |
| from mcp.client.session import ClientSession |
| from datetime import timedelta |
| from contextlib import asynccontextmanager |
|
|
| class CustomMCPClient: |
| def __init__(self, url: str, headers: dict = None, timeout: int = 30): |
| self.url = url |
| self.headers = headers or {} |
| self.timeout = timeout |
|
|
| async def call_tool(self, tool_name: str, arguments: dict): |
| async with self._create_session() as session: |
| return await session.call_tool(tool_name, arguments) |
|
|
| async def list_tools(self): |
| async with self._create_session() as session: |
| return await session.list_tools() |
|
|
| @asynccontextmanager |
| async def _create_session(self): |
| async with original_sse_client( |
| url=self.url, |
| headers=self.headers, |
| timeout=self.timeout |
| ) as streams: |
| async with ClientSession( |
| *streams, |
| read_timeout_seconds=timedelta(seconds=self.timeout) |
| ) as session: |
| await session.initialize() |
| yield session |
|
|
| class MCPAgent: |
| def __init__(self, sse_url: str, hf_token: str, llm): |
| self.sse_url = sse_url |
| self.hf_token = hf_token |
| self.llm = llm |
| self.agent = None |
| self.agent_context = None |
| |
| self.SYSTEM_PROMPT = """ |
| Eres un **Asistente de IA especializado en llamadas a herramientas (Tool Calling)**. |
| Tu funci贸n es responder EXCLUSIVAMENTE usando las herramientas asignadas, sin usar conocimientos previos. |
| |
| ### 馃敼 Herramientas Disponibles: |
| 1. **query_space**: Consulta informaci贸n t茅cnica en espacios especializados (PRINCIPAL/PREFERENTE) |
| 2. **list_spaces_names**: Lista espacios disponibles. (Contiene los 'space_name', 煤til para usar query_space) |
| 3. **search_tavily**: Recuperador de informaci贸n y contenido web que usa el motor de b煤squeda tavily |
| |
| ### 鈿狅笍 Reglas Estrictas: |
| 1. **OBLIGATORIO**: Usar `query_space` para cualquier consulta t茅cnica. |
| 2. **PROHIBIDO**: Inventar respuestas o usar conocimientos no verificados. |
| 3. **PRIORIDAD**: Siempre validar informaci贸n con herramientas antes de responder. |
| 4. **PRECISI脫N**: Si la herramienta falla, NO intentes adivinar la respuesta. |
| """ |
| |
| async def initialize(self): |
| """Inicializa el cliente MCP y el agente""" |
| mcp_client = CustomMCPClient( |
| self.sse_url, |
| headers={"Authorization": f"Bearer {self.hf_token}"} |
| ) |
| mcp_tool = McpToolSpec(client=mcp_client) |
| |
| tools = await mcp_tool.to_tool_list_async() |
| self.agent = FunctionAgent( |
| name="MCP_Agent", |
| description="Agente que utiliza herramientas MCP para responder.", |
| tools=tools, |
| llm=self.llm, |
| system_prompt=self.SYSTEM_PROMPT, |
| ) |
| self.agent_context = Context(self.agent) |
| |
| return await mcp_client.list_tools() |
| |
| async def process_message(self, message: str) -> str: |
| """Procesa un mensaje del usuario y devuelve la respuesta del agente""" |
| if not self.agent: |
| await self.initialize() |
| |
| handler = self.agent.run(message, ctx=self.agent_context) |
| return str(await handler) |
| |
| async def stream_response(self, message: str) -> AsyncGenerator[Dict, None]: |
| """Genera la respuesta del agente en formato de streaming""" |
| if not self.agent: |
| await self.initialize() |
| |
| handler = self.agent.run(message, ctx=self.agent_context) |
| async for event in handler.stream_events(): |
| if isinstance(event, ToolCall): |
| yield { |
| "type": "tool_call", |
| "tool_name": event.tool_name, |
| "arguments": event.tool_kwargs |
| } |
| elif isinstance(event, ToolCallResult): |
| yield { |
| "type": "tool_result", |
| "tool_name": event.tool_name, |
| "result": event.tool_output |
| } |
| |
| yield { |
| "type": "final_response", |
| "content": str(await handler) |
| } |