Spaces:
Sleeping
Sleeping
File size: 5,352 Bytes
d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c 8abdadf c202f7c 8abdadf d14cb3f c202f7c d14cb3f c202f7c d14cb3f 8abdadf c202f7c 8abdadf c202f7c 8abdadf d14cb3f c202f7c d14cb3f c202f7c d14cb3f 8abdadf d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
from typing import Dict, Any, List, Optional
import asyncio
import os
import json
import httpx
from .agent_manager_client import AgentManagerClient, AgentInfo, CreateAgentRequest
from .mcp_agent_interface import MCPAgent # 导入 MCP Agent 接口
class RootAgent:
def __init__(self):
AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
self.agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
self.active_agents: Dict[str, AgentInfo] = {} # 内存缓存活跃 Agent
async def initialize(self):
"""
RootAgent 启动时的初始化逻辑,通过 Agent Manager 发现 Agent。
"""
await self._refresh_active_agents()
print("RootAgent initialized.")
async def shutdown(self):
"""
RootAgent 关闭时的清理逻辑。
"""
print("RootAgent shutdown.")
async def _refresh_active_agents(self):
"""
从 Agent Manager 加载所有活跃的 Agent 到内存缓存。
"""
try:
agents = await self.agent_manager_client.list_agents()
self.active_agents = {agent.id: agent for agent in agents}
print(f"RootAgent refreshed active agents. Found {len(self.active_agents)} agents.")
except httpx.HTTPStatusError as e:
print(f"Failed to refresh agents from Agent Manager: {e.response.text}")
# 可以在这里选择抛出异常或以降级模式运行
except Exception as e:
print(f"An unexpected error occurred while refreshing agents: {e}")
async def _get_or_create_agent(self, agent_type: str, image_name: str = "echo-agent:latest") -> AgentInfo:
"""
尝试从缓存中获取一个可用 Agent,如果不存在则请求 Agent Manager 创建。
"""
# 1. 检查内存缓存中是否有可用 Agent
for agent_id, agent_info in self.active_agents.items():
if agent_info.agent_type == agent_type and agent_info.status == "running":
print(f"Found existing agent {agent_id} of type {agent_type}.")
return agent_info
# 2. 如果没有,请求 Agent Manager 创建新 Agent
print(f"No active agent of type {agent_type} found. Requesting Agent Manager to create one.")
create_request = CreateAgentRequest(
agent_type=agent_type,
image_name=image_name,
env_vars={}, # 移除 Redis 相关的环境变量,因为 RootAgent 不再直接使用 Redis
config={"some_initial_config": "value"}
)
try:
new_agent_info = await self.agent_manager_client.create_agent(create_request)
self.active_agents[new_agent_info.id] = new_agent_info # 添加到缓存
print(f"Agent Manager created new agent: {new_agent_info.id} ({new_agent_info.agent_type})")
return new_agent_info
except httpx.HTTPStatusError as e:
print(f"Failed to create agent via Agent Manager: {e.response.text}")
raise RuntimeError(f"Failed to create agent {agent_type}: {e}")
except Exception as e:
print(f"An unexpected error occurred while creating agent: {e}")
raise RuntimeError(f"Failed to create agent {agent_type}: {e}")
async def process_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
"""
RootAgent 的核心任务处理逻辑:任务拆分、调度、结果聚合。
"""
print(f"RootAgent received task: {task_data}")
# 示例:简单任务拆分和调度
# 假设 task_data 包含 "subtasks" 列表,每个子任务有 "agent_type" 和 "payload"
subtasks_definitions = task_data.get("subtasks", [])
if not subtasks_definitions:
return {"status": "failed", "error": "No subtasks defined in the task."}
results = []
for subtask_def in subtasks_definitions:
agent_type = subtask_def.get("agent_type")
payload = subtask_def.get("payload")
if not agent_type or not payload:
results.append({"status": "failed", "error": "Invalid subtask definition."})
continue
try:
# 获取或创建 Agent
agent_info = await self._get_or_create_agent(agent_type)
# 调用 Agent 的 MCP 服务
async with httpx.AsyncClient() as client:
response = await client.post(
f"{agent_info.mcp_endpoint}/process_subtask",
json=payload,
timeout=30.0 # 设定超时
)
response.raise_for_status()
subtask_result = response.json()
results.append({"agent_type": agent_type, "result": subtask_result})
except Exception as e:
results.append({"agent_type": agent_type, "status": "failed", "error": str(e)})
print(f"Error processing subtask with agent {agent_type}: {e}")
# 聚合结果
final_output = {"status": "completed", "aggregated_results": results}
print(f"RootAgent finished processing task. Results: {final_output}")
return final_output
|