Spaces:
Sleeping
Sleeping
| from typing import Dict, Any, List, Optional | |
| import asyncio | |
| import os | |
| import json | |
| import httpx | |
| from .agent_manager_client import AgentManagerClient, AgentInfo, CreateAgentRequest | |
| from .mcp_agent_interface import MCPAgent # 导入 MCP Agent 接口 | |
| class RootAgent: | |
| def __init__(self): | |
| AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860") | |
| self.agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL) | |
| self.active_agents: Dict[str, AgentInfo] = {} # 内存缓存活跃 Agent | |
| async def initialize(self): | |
| """ | |
| RootAgent 启动时的初始化逻辑,通过 Agent Manager 发现 Agent。 | |
| """ | |
| await self._refresh_active_agents() | |
| print("RootAgent initialized.") | |
| async def shutdown(self): | |
| """ | |
| RootAgent 关闭时的清理逻辑。 | |
| """ | |
| print("RootAgent shutdown.") | |
| async def _refresh_active_agents(self): | |
| """ | |
| 从 Agent Manager 加载所有活跃的 Agent 到内存缓存。 | |
| """ | |
| try: | |
| agents = await self.agent_manager_client.list_agents() | |
| self.active_agents = {agent.id: agent for agent in agents} | |
| print(f"RootAgent refreshed active agents. Found {len(self.active_agents)} agents.") | |
| except httpx.HTTPStatusError as e: | |
| print(f"Failed to refresh agents from Agent Manager: {e.response.text}") | |
| # 可以在这里选择抛出异常或以降级模式运行 | |
| except Exception as e: | |
| print(f"An unexpected error occurred while refreshing agents: {e}") | |
| async def _get_or_create_agent(self, agent_type: str, image_name: str = "echo-agent:latest") -> AgentInfo: | |
| """ | |
| 尝试从缓存中获取一个可用 Agent,如果不存在则请求 Agent Manager 创建。 | |
| """ | |
| # 1. 检查内存缓存中是否有可用 Agent | |
| for agent_id, agent_info in self.active_agents.items(): | |
| if agent_info.agent_type == agent_type and agent_info.status == "running": | |
| print(f"Found existing agent {agent_id} of type {agent_type}.") | |
| return agent_info | |
| # 2. 如果没有,请求 Agent Manager 创建新 Agent | |
| print(f"No active agent of type {agent_type} found. Requesting Agent Manager to create one.") | |
| create_request = CreateAgentRequest( | |
| agent_type=agent_type, | |
| image_name=image_name, | |
| env_vars={}, # 移除 Redis 相关的环境变量,因为 RootAgent 不再直接使用 Redis | |
| config={"some_initial_config": "value"} | |
| ) | |
| try: | |
| new_agent_info = await self.agent_manager_client.create_agent(create_request) | |
| self.active_agents[new_agent_info.id] = new_agent_info # 添加到缓存 | |
| print(f"Agent Manager created new agent: {new_agent_info.id} ({new_agent_info.agent_type})") | |
| return new_agent_info | |
| except httpx.HTTPStatusError as e: | |
| print(f"Failed to create agent via Agent Manager: {e.response.text}") | |
| raise RuntimeError(f"Failed to create agent {agent_type}: {e}") | |
| except Exception as e: | |
| print(f"An unexpected error occurred while creating agent: {e}") | |
| raise RuntimeError(f"Failed to create agent {agent_type}: {e}") | |
| async def process_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| RootAgent 的核心任务处理逻辑:任务拆分、调度、结果聚合。 | |
| """ | |
| print(f"RootAgent received task: {task_data}") | |
| # 示例:简单任务拆分和调度 | |
| # 假设 task_data 包含 "subtasks" 列表,每个子任务有 "agent_type" 和 "payload" | |
| subtasks_definitions = task_data.get("subtasks", []) | |
| if not subtasks_definitions: | |
| return {"status": "failed", "error": "No subtasks defined in the task."} | |
| results = [] | |
| for subtask_def in subtasks_definitions: | |
| agent_type = subtask_def.get("agent_type") | |
| payload = subtask_def.get("payload") | |
| if not agent_type or not payload: | |
| results.append({"status": "failed", "error": "Invalid subtask definition."}) | |
| continue | |
| try: | |
| # 获取或创建 Agent | |
| agent_info = await self._get_or_create_agent(agent_type) | |
| # 调用 Agent 的 MCP 服务 | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"{agent_info.mcp_endpoint}/process_subtask", | |
| json=payload, | |
| timeout=30.0 # 设定超时 | |
| ) | |
| response.raise_for_status() | |
| subtask_result = response.json() | |
| results.append({"agent_type": agent_type, "result": subtask_result}) | |
| except Exception as e: | |
| results.append({"agent_type": agent_type, "status": "failed", "error": str(e)}) | |
| print(f"Error processing subtask with agent {agent_type}: {e}") | |
| # 聚合结果 | |
| final_output = {"status": "completed", "aggregated_results": results} | |
| print(f"RootAgent finished processing task. Results: {final_output}") | |
| return final_output | |