root_agent / agents /root_agent.py
airsltd's picture
update
8abdadf
from typing import Dict, Any, List, Optional
import asyncio
import os
import json
import httpx
from .agent_manager_client import AgentManagerClient, AgentInfo, CreateAgentRequest
from .mcp_agent_interface import MCPAgent # 导入 MCP Agent 接口
class RootAgent:
def __init__(self):
AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
self.agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
self.active_agents: Dict[str, AgentInfo] = {} # 内存缓存活跃 Agent
async def initialize(self):
"""
RootAgent 启动时的初始化逻辑,通过 Agent Manager 发现 Agent。
"""
await self._refresh_active_agents()
print("RootAgent initialized.")
async def shutdown(self):
"""
RootAgent 关闭时的清理逻辑。
"""
print("RootAgent shutdown.")
async def _refresh_active_agents(self):
"""
从 Agent Manager 加载所有活跃的 Agent 到内存缓存。
"""
try:
agents = await self.agent_manager_client.list_agents()
self.active_agents = {agent.id: agent for agent in agents}
print(f"RootAgent refreshed active agents. Found {len(self.active_agents)} agents.")
except httpx.HTTPStatusError as e:
print(f"Failed to refresh agents from Agent Manager: {e.response.text}")
# 可以在这里选择抛出异常或以降级模式运行
except Exception as e:
print(f"An unexpected error occurred while refreshing agents: {e}")
async def _get_or_create_agent(self, agent_type: str, image_name: str = "echo-agent:latest") -> AgentInfo:
"""
尝试从缓存中获取一个可用 Agent,如果不存在则请求 Agent Manager 创建。
"""
# 1. 检查内存缓存中是否有可用 Agent
for agent_id, agent_info in self.active_agents.items():
if agent_info.agent_type == agent_type and agent_info.status == "running":
print(f"Found existing agent {agent_id} of type {agent_type}.")
return agent_info
# 2. 如果没有,请求 Agent Manager 创建新 Agent
print(f"No active agent of type {agent_type} found. Requesting Agent Manager to create one.")
create_request = CreateAgentRequest(
agent_type=agent_type,
image_name=image_name,
env_vars={}, # 移除 Redis 相关的环境变量,因为 RootAgent 不再直接使用 Redis
config={"some_initial_config": "value"}
)
try:
new_agent_info = await self.agent_manager_client.create_agent(create_request)
self.active_agents[new_agent_info.id] = new_agent_info # 添加到缓存
print(f"Agent Manager created new agent: {new_agent_info.id} ({new_agent_info.agent_type})")
return new_agent_info
except httpx.HTTPStatusError as e:
print(f"Failed to create agent via Agent Manager: {e.response.text}")
raise RuntimeError(f"Failed to create agent {agent_type}: {e}")
except Exception as e:
print(f"An unexpected error occurred while creating agent: {e}")
raise RuntimeError(f"Failed to create agent {agent_type}: {e}")
async def process_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
"""
RootAgent 的核心任务处理逻辑:任务拆分、调度、结果聚合。
"""
print(f"RootAgent received task: {task_data}")
# 示例:简单任务拆分和调度
# 假设 task_data 包含 "subtasks" 列表,每个子任务有 "agent_type" 和 "payload"
subtasks_definitions = task_data.get("subtasks", [])
if not subtasks_definitions:
return {"status": "failed", "error": "No subtasks defined in the task."}
results = []
for subtask_def in subtasks_definitions:
agent_type = subtask_def.get("agent_type")
payload = subtask_def.get("payload")
if not agent_type or not payload:
results.append({"status": "failed", "error": "Invalid subtask definition."})
continue
try:
# 获取或创建 Agent
agent_info = await self._get_or_create_agent(agent_type)
# 调用 Agent 的 MCP 服务
async with httpx.AsyncClient() as client:
response = await client.post(
f"{agent_info.mcp_endpoint}/process_subtask",
json=payload,
timeout=30.0 # 设定超时
)
response.raise_for_status()
subtask_result = response.json()
results.append({"agent_type": agent_type, "result": subtask_result})
except Exception as e:
results.append({"agent_type": agent_type, "status": "failed", "error": str(e)})
print(f"Error processing subtask with agent {agent_type}: {e}")
# 聚合结果
final_output = {"status": "completed", "aggregated_results": results}
print(f"RootAgent finished processing task. Results: {final_output}")
return final_output