File size: 5,352 Bytes
d14cb3f
c202f7c
d14cb3f
 
 
 
 
 
 
c202f7c
 
d14cb3f
 
 
 
 
c202f7c
8abdadf
c202f7c
8abdadf
d14cb3f
 
 
c202f7c
d14cb3f
c202f7c
d14cb3f
 
8abdadf
c202f7c
8abdadf
c202f7c
8abdadf
 
 
 
 
 
 
 
 
d14cb3f
 
 
c202f7c
d14cb3f
c202f7c
d14cb3f
 
 
 
 
 
 
 
 
 
 
8abdadf
d14cb3f
 
 
 
 
 
 
 
 
 
 
 
 
c202f7c
d14cb3f
c202f7c
d14cb3f
c202f7c
d14cb3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from typing import Dict, Any, List, Optional
import asyncio
import os
import json
import httpx

from .agent_manager_client import AgentManagerClient, AgentInfo, CreateAgentRequest
from .mcp_agent_interface import MCPAgent # 导入 MCP Agent 接口

class RootAgent:
    def __init__(self):
        AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
        self.agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
        self.active_agents: Dict[str, AgentInfo] = {} # 内存缓存活跃 Agent

    async def initialize(self):
        """
        RootAgent 启动时的初始化逻辑,通过 Agent Manager 发现 Agent。
        """
        await self._refresh_active_agents()
        print("RootAgent initialized.")

    async def shutdown(self):
        """
        RootAgent 关闭时的清理逻辑。
        """
        print("RootAgent shutdown.")

    async def _refresh_active_agents(self):
        """
        从 Agent Manager 加载所有活跃的 Agent 到内存缓存。
        """
        try:
            agents = await self.agent_manager_client.list_agents()
            self.active_agents = {agent.id: agent for agent in agents}
            print(f"RootAgent refreshed active agents. Found {len(self.active_agents)} agents.")
        except httpx.HTTPStatusError as e:
            print(f"Failed to refresh agents from Agent Manager: {e.response.text}")
            # 可以在这里选择抛出异常或以降级模式运行
        except Exception as e:
            print(f"An unexpected error occurred while refreshing agents: {e}")


    async def _get_or_create_agent(self, agent_type: str, image_name: str = "echo-agent:latest") -> AgentInfo:
        """
        尝试从缓存中获取一个可用 Agent,如果不存在则请求 Agent Manager 创建。
        """
        # 1. 检查内存缓存中是否有可用 Agent
        for agent_id, agent_info in self.active_agents.items():
            if agent_info.agent_type == agent_type and agent_info.status == "running":
                print(f"Found existing agent {agent_id} of type {agent_type}.")
                return agent_info

        # 2. 如果没有,请求 Agent Manager 创建新 Agent
        print(f"No active agent of type {agent_type} found. Requesting Agent Manager to create one.")
        create_request = CreateAgentRequest(
            agent_type=agent_type,
            image_name=image_name,
            env_vars={}, # 移除 Redis 相关的环境变量,因为 RootAgent 不再直接使用 Redis
            config={"some_initial_config": "value"}
        )
        try:
            new_agent_info = await self.agent_manager_client.create_agent(create_request)
            self.active_agents[new_agent_info.id] = new_agent_info # 添加到缓存
            print(f"Agent Manager created new agent: {new_agent_info.id} ({new_agent_info.agent_type})")
            return new_agent_info
        except httpx.HTTPStatusError as e:
            print(f"Failed to create agent via Agent Manager: {e.response.text}")
            raise RuntimeError(f"Failed to create agent {agent_type}: {e}")
        except Exception as e:
            print(f"An unexpected error occurred while creating agent: {e}")
            raise RuntimeError(f"Failed to create agent {agent_type}: {e}")

    async def process_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        RootAgent 的核心任务处理逻辑:任务拆分、调度、结果聚合。
        """
        print(f"RootAgent received task: {task_data}")

        # 示例:简单任务拆分和调度
        # 假设 task_data 包含 "subtasks" 列表,每个子任务有 "agent_type" 和 "payload"
        subtasks_definitions = task_data.get("subtasks", [])
        if not subtasks_definitions:
            return {"status": "failed", "error": "No subtasks defined in the task."}

        results = []
        for subtask_def in subtasks_definitions:
            agent_type = subtask_def.get("agent_type")
            payload = subtask_def.get("payload")

            if not agent_type or not payload:
                results.append({"status": "failed", "error": "Invalid subtask definition."})
                continue

            try:
                # 获取或创建 Agent
                agent_info = await self._get_or_create_agent(agent_type)
                
                # 调用 Agent 的 MCP 服务
                async with httpx.AsyncClient() as client:
                    response = await client.post(
                        f"{agent_info.mcp_endpoint}/process_subtask",
                        json=payload,
                        timeout=30.0 # 设定超时
                    )
                    response.raise_for_status()
                    subtask_result = response.json()
                    results.append({"agent_type": agent_type, "result": subtask_result})
            except Exception as e:
                results.append({"agent_type": agent_type, "status": "failed", "error": str(e)})
                print(f"Error processing subtask with agent {agent_type}: {e}")

        # 聚合结果
        final_output = {"status": "completed", "aggregated_results": results}
        print(f"RootAgent finished processing task. Results: {final_output}")
        return final_output