Spaces:
Sleeping
Sleeping
| # mcp_agent.py | |
| import asyncio | |
| from typing import List, Dict, Optional, Generator, AsyncGenerator | |
| from fastmcp import Client | |
| from fastmcp.client.transports import SSETransport | |
| from llama_index.core.workflow import Context | |
| from llama_index.tools.mcp import McpToolSpec | |
| from llama_index.core.agent.workflow import FunctionAgent, ToolCall, ToolCallResult | |
| from mcp.client.sse import sse_client as original_sse_client | |
| from mcp.client.session import ClientSession | |
| from datetime import timedelta | |
| from contextlib import asynccontextmanager | |
| class CustomMCPClient: | |
| def __init__(self, url: str, headers: dict = None, timeout: int = 30): | |
| self.url = url | |
| self.headers = headers or {} | |
| self.timeout = timeout | |
| async def call_tool(self, tool_name: str, arguments: dict): | |
| async with self._create_session() as session: | |
| return await session.call_tool(tool_name, arguments) | |
| async def list_tools(self): | |
| async with self._create_session() as session: | |
| return await session.list_tools() | |
| async def _create_session(self): | |
| async with original_sse_client( | |
| url=self.url, | |
| headers=self.headers, | |
| timeout=self.timeout | |
| ) as streams: | |
| async with ClientSession( | |
| *streams, | |
| read_timeout_seconds=timedelta(seconds=self.timeout) | |
| ) as session: | |
| await session.initialize() | |
| yield session | |
| class MCPAgent: | |
| def __init__(self, sse_url: str, hf_token: str, llm): | |
| self.sse_url = sse_url | |
| self.hf_token = hf_token | |
| self.llm = llm | |
| self.agent = None | |
| self.agent_context = None | |
| self.SYSTEM_PROMPT = """ | |
| Eres un **Asistente de IA especializado en llamadas a herramientas (Tool Calling)**. | |
| Tu funci贸n es responder EXCLUSIVAMENTE usando las herramientas asignadas, sin usar conocimientos previos. | |
| ### 馃敼 Herramientas Disponibles: | |
| 1. **query_space**: Consulta informaci贸n t茅cnica en espacios especializados (PRINCIPAL/PREFERENTE) | |
| 2. **list_spaces_names**: Lista espacios disponibles. (Contiene los 'space_name', 煤til para usar query_space) | |
| 3. **search_tavily**: Recuperador de informaci贸n y contenido web que usa el motor de b煤squeda tavily | |
| ### 鈿狅笍 Reglas Estrictas: | |
| 1. **OBLIGATORIO**: Usar `query_space` para cualquier consulta t茅cnica. | |
| 2. **PROHIBIDO**: Inventar respuestas o usar conocimientos no verificados. | |
| 3. **PRIORIDAD**: Siempre validar informaci贸n con herramientas antes de responder. | |
| 4. **PRECISI脫N**: Si la herramienta falla, NO intentes adivinar la respuesta. | |
| """ | |
| async def initialize(self): | |
| """Inicializa el cliente MCP y el agente""" | |
| mcp_client = CustomMCPClient( | |
| self.sse_url, | |
| headers={"Authorization": f"Bearer {self.hf_token}"} | |
| ) | |
| mcp_tool = McpToolSpec(client=mcp_client) | |
| tools = await mcp_tool.to_tool_list_async() | |
| self.agent = FunctionAgent( | |
| name="MCP_Agent", | |
| description="Agente que utiliza herramientas MCP para responder.", | |
| tools=tools, | |
| llm=self.llm, | |
| system_prompt=self.SYSTEM_PROMPT, | |
| ) | |
| self.agent_context = Context(self.agent) | |
| return await mcp_client.list_tools() | |
| async def process_message(self, message: str) -> str: | |
| """Procesa un mensaje del usuario y devuelve la respuesta del agente""" | |
| if not self.agent: | |
| await self.initialize() | |
| handler = self.agent.run(message, ctx=self.agent_context) | |
| return str(await handler) | |
| async def stream_response(self, message: str) -> AsyncGenerator[Dict, None]: | |
| """Genera la respuesta del agente en formato de streaming""" | |
| if not self.agent: | |
| await self.initialize() | |
| handler = self.agent.run(message, ctx=self.agent_context) | |
| async for event in handler.stream_events(): | |
| if isinstance(event, ToolCall): | |
| yield { | |
| "type": "tool_call", | |
| "tool_name": event.tool_name, | |
| "arguments": event.tool_kwargs | |
| } | |
| elif isinstance(event, ToolCallResult): | |
| yield { | |
| "type": "tool_result", | |
| "tool_name": event.tool_name, | |
| "result": event.tool_output | |
| } | |
| yield { | |
| "type": "final_response", | |
| "content": str(await handler) | |
| } |