Demo_MCP_Server_Spotify / mcp_agent.py
geronimo-pericoli's picture
Update mcp_agent.py
b14749d verified
raw
history blame
4.69 kB
# mcp_agent.py
import asyncio
from typing import List, Dict, Optional, Generator, AsyncGenerator
from fastmcp import Client
from fastmcp.client.transports import SSETransport
from llama_index.core.workflow import Context
from llama_index.tools.mcp import McpToolSpec
from llama_index.core.agent.workflow import FunctionAgent, ToolCall, ToolCallResult
from mcp.client.sse import sse_client as original_sse_client
from mcp.client.session import ClientSession
from datetime import timedelta
from contextlib import asynccontextmanager
class CustomMCPClient:
def __init__(self, url: str, headers: dict = None, timeout: int = 30):
self.url = url
self.headers = headers or {}
self.timeout = timeout
async def call_tool(self, tool_name: str, arguments: dict):
async with self._create_session() as session:
return await session.call_tool(tool_name, arguments)
async def list_tools(self):
async with self._create_session() as session:
return await session.list_tools()
@asynccontextmanager
async def _create_session(self):
async with original_sse_client(
url=self.url,
headers=self.headers,
timeout=self.timeout
) as streams:
async with ClientSession(
*streams,
read_timeout_seconds=timedelta(seconds=self.timeout)
) as session:
await session.initialize()
yield session
class MCPAgent:
def __init__(self, sse_url: str, hf_token: str, llm):
self.sse_url = sse_url
self.hf_token = hf_token
self.llm = llm
self.agent = None
self.agent_context = None
self.SYSTEM_PROMPT = """
Eres un **Asistente de IA especializado en llamadas a herramientas (Tool Calling)**.
Tu funci贸n es responder EXCLUSIVAMENTE usando las herramientas asignadas, sin usar conocimientos previos.
### 馃敼 Herramientas Disponibles:
1. **query_space**: Consulta informaci贸n t茅cnica en espacios especializados (PRINCIPAL/PREFERENTE)
2. **list_spaces_names**: Lista espacios disponibles. (Contiene los 'space_name', 煤til para usar query_space)
3. **search_tavily**: Recuperador de informaci贸n y contenido web que usa el motor de b煤squeda tavily
### 鈿狅笍 Reglas Estrictas:
1. **OBLIGATORIO**: Usar `query_space` para cualquier consulta t茅cnica.
2. **PROHIBIDO**: Inventar respuestas o usar conocimientos no verificados.
3. **PRIORIDAD**: Siempre validar informaci贸n con herramientas antes de responder.
4. **PRECISI脫N**: Si la herramienta falla, NO intentes adivinar la respuesta.
"""
async def initialize(self):
"""Inicializa el cliente MCP y el agente"""
mcp_client = CustomMCPClient(
self.sse_url,
headers={"Authorization": f"Bearer {self.hf_token}"}
)
mcp_tool = McpToolSpec(client=mcp_client)
tools = await mcp_tool.to_tool_list_async()
self.agent = FunctionAgent(
name="MCP_Agent",
description="Agente que utiliza herramientas MCP para responder.",
tools=tools,
llm=self.llm,
system_prompt=self.SYSTEM_PROMPT,
)
self.agent_context = Context(self.agent)
return await mcp_client.list_tools()
async def process_message(self, message: str) -> str:
"""Procesa un mensaje del usuario y devuelve la respuesta del agente"""
if not self.agent:
await self.initialize()
handler = self.agent.run(message, ctx=self.agent_context)
return str(await handler)
async def stream_response(self, message: str) -> AsyncGenerator[Dict, None]:
"""Genera la respuesta del agente en formato de streaming"""
if not self.agent:
await self.initialize()
handler = self.agent.run(message, ctx=self.agent_context)
async for event in handler.stream_events():
if isinstance(event, ToolCall):
yield {
"type": "tool_call",
"tool_name": event.tool_name,
"arguments": event.tool_kwargs
}
elif isinstance(event, ToolCallResult):
yield {
"type": "tool_result",
"tool_name": event.tool_name,
"result": event.tool_output
}
yield {
"type": "final_response",
"content": str(await handler)
}