airsltd commited on
Commit
d14cb3f
·
1 Parent(s): d776cdf
agents/Dockerfile.echo_agent ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Read the doc: https://huggingface.co/docs/hub/spaces-sdks-docker
2
+ # you will also find guides on how best to write your Dockerfile
3
+
4
+ FROM python:3.12.11
5
+
6
+ RUN useradd -m -u 1000 user
7
+ USER user
8
+ ENV PATH="/home/user/.local/bin:$PATH"
9
+
10
+ WORKDIR /app
11
+
12
+ COPY --chown=user ./requirements.txt requirements.txt
13
+ RUN pip install --no-cache-dir --upgrade -r requirements.txt
14
+
15
+ COPY --chown=user . /app
16
+ CMD ["uvicorn", "echo_agent:app", "--host", "0.0.0.0", "--port", "8000"]
agents/agent_manager_client.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, List, Optional
2
+ import httpx
3
+ import os
4
+
5
+ # 假设 agent_manager 的 models.py 可以在这里被导入,或者我们重新定义需要的模型
6
+ # 为了避免循环依赖,这里直接定义需要的模型,或者从一个共享的包中导入
7
+ from pydantic import BaseModel
8
+
9
+ class AgentInfo(BaseModel):
10
+ id: str
11
+ agent_type: str
12
+ mcp_endpoint: str
13
+ status: str
14
+ created_at: str
15
+ last_heartbeat: str
16
+ metadata: Dict = {}
17
+
18
+ class CreateAgentRequest(BaseModel):
19
+ agent_type: str
20
+ image_name: str
21
+ env_vars: Dict[str, str] = {}
22
+ resource_limits: Dict = {}
23
+ config: Dict = {}
24
+
25
+ class AgentManagerClient:
26
+ def __init__(self, base_url: str):
27
+ self.base_url = base_url
28
+ self.client = httpx.AsyncClient()
29
+
30
+ async def create_agent(self, request: CreateAgentRequest) -> AgentInfo:
31
+ """
32
+ 向 Agent Manager 请求创建一个新的 Agent 实例。
33
+ """
34
+ response = await self.client.post(f"{self.base_url}/agents", json=request.dict())
35
+ response.raise_for_status()
36
+ return AgentInfo(**response.json())
37
+
38
+ async def get_agent_info(self, agent_id: str) -> AgentInfo:
39
+ """
40
+ 从 Agent Manager 获取指定 Agent 实例的详细信息。
41
+ """
42
+ response = await self.client.get(f"{self.base_url}/agents/{agent_id}")
43
+ response.raise_for_status()
44
+ return AgentInfo(**response.json())
45
+
46
+ async def list_agents(self, agent_type: Optional[str] = None) -> List[AgentInfo]:
47
+ """
48
+ 从 Agent Manager 列出所有活跃的 Agent 实例。
49
+ """
50
+ params = {"agent_type": agent_type} if agent_type else {}
51
+ response = await self.client.get(f"{self.base_url}/agents", params=params)
52
+ response.raise_for_status()
53
+ return [AgentInfo(**agent_data) for agent_data in response.json()]
54
+
55
+ async def destroy_agent(self, agent_id: str) -> bool:
56
+ """
57
+ 向 Agent Manager 请求销毁一个 Agent 实例。
58
+ """
59
+ response = await self.client.delete(f"{self.base_url}/agents/{agent_id}")
60
+ response.raise_for_status()
61
+ return response.status_code == 204 # No Content
62
+
63
+ async def stop_agent(self, agent_id: str) -> bool:
64
+ """
65
+ 向 Agent Manager 请求停止一个 Agent 实例。
66
+ """
67
+ response = await self.client.post(f"{self.base_url}/agents/{agent_id}/stop")
68
+ response.raise_for_status()
69
+ return response.status_code == 200 # OK
70
+
71
+ # 示例用法 (在实际 RootAgent 中使用)
72
+ # AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
73
+ # agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
agents/echo_agent.py ADDED
@@ -0,0 +1,134 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Any
2
+ from datetime import datetime
3
+ import asyncio
4
+ import json
5
+ import os
6
+ import redis
7
+ import uvicorn
8
+ from fastapi import FastAPI, BackgroundTasks
9
+
10
+ from .mcp_agent_interface import MCPAgent
11
+
12
+ # Redis 配置
13
+ REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
14
+ REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
15
+ REDIS_DB = int(os.getenv("REDIS_DB", 0))
16
+ AGENT_CHANNEL = "agent_discovery_channel"
17
+ AGENT_KEY_PREFIX = "agent:"
18
+ HEARTBEAT_INTERVAL = 5 # seconds
19
+
20
+ class EchoAgent(MCPAgent):
21
+ def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]):
22
+ super().__init__(agent_id, agent_type, config)
23
+ self.redis_client: Optional[redis.Redis] = None
24
+ self.heartbeat_task: Optional[asyncio.Task] = None
25
+ self.mcp_endpoint = os.getenv("MCP_ENDPOINT", f"http://localhost:8000") # Agent 自身的 MCP 服务地址
26
+
27
+ async def initialize(self):
28
+ """
29
+ Agent 启动时的初始化逻辑,连接 Redis 并注册。
30
+ """
31
+ self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
32
+ try:
33
+ self.redis_client.ping()
34
+ print(f"Agent {self.id} connected to Redis successfully!")
35
+ except redis.exceptions.ConnectionError as e:
36
+ print(f"Agent {self.id} could not connect to Redis: {e}")
37
+ # 可以在这里选择退出或以降级模式运行
38
+
39
+ await self._register_agent()
40
+ self.heartbeat_task = asyncio.create_task(self._send_heartbeat())
41
+ print(f"EchoAgent {self.id} initialized and registered.")
42
+
43
+ async def shutdown(self):
44
+ """
45
+ Agent 关闭时的清理逻辑,发送下线消息并停止心跳。
46
+ """
47
+ if self.heartbeat_task:
48
+ self.heartbeat_task.cancel()
49
+ try:
50
+ await self.heartbeat_task
51
+ except asyncio.CancelledError:
52
+ pass
53
+ await self._unregister_agent()
54
+ if self.redis_client:
55
+ self.redis_client.close()
56
+ print(f"EchoAgent {self.id} shutdown.")
57
+
58
+ async def _register_agent(self):
59
+ """
60
+ 向 Redis 注册 Agent 信息。
61
+ """
62
+ agent_info = {
63
+ "id": self.id,
64
+ "agent_type": self.agent_type,
65
+ "mcp_endpoint": self.mcp_endpoint,
66
+ "status": "running",
67
+ "created_at": datetime.now().isoformat(),
68
+ "last_heartbeat": datetime.now().isoformat(),
69
+ "metadata": json.dumps(self.config) # 将配置作为元数据存储
70
+ }
71
+ self.redis_client.hmset(f"{AGENT_KEY_PREFIX}{self.id}", agent_info)
72
+ self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "ONLINE"}))
73
+ print(f"Agent {self.id} registered to Redis.")
74
+
75
+ async def _unregister_agent(self):
76
+ """
77
+ 向 Redis 发送下线消息。
78
+ """
79
+ self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "OFFLINE"}))
80
+ # Agent Manager 会负责从 Redis 中删除键
81
+ print(f"Agent {self.id} sent OFFLINE message to Redis.")
82
+
83
+ async def _send_heartbeat(self):
84
+ """
85
+ 定期向 Redis 发送心跳。
86
+ """
87
+ while True:
88
+ await asyncio.sleep(HEARTBEAT_INTERVAL)
89
+ try:
90
+ self.redis_client.hset(f"{AGENT_KEY_PREFIX}{self.id}", "last_heartbeat", datetime.now().isoformat())
91
+ self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "HEARTBEAT"}))
92
+ # print(f"Agent {self.id} sent HEARTBEAT.")
93
+ except Exception as e:
94
+ print(f"Error sending heartbeat for Agent {self.id}: {e}")
95
+
96
+ async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]:
97
+ """
98
+ EchoAgent 的核心逻辑:接收消息并返回。
99
+ """
100
+ print(f"EchoAgent {self.id} received subtask: {subtask_data}")
101
+ message = subtask_data.get("message", "No message provided.")
102
+ return {"status": "completed", "output": f"Echo: {message}", "agent_id": self.id}
103
+
104
+ # FastAPI 应用实例
105
+ app = FastAPI()
106
+ echo_agent: Optional[EchoAgent] = None
107
+
108
+ @app.on_event("startup")
109
+ async def startup_event():
110
+ global echo_agent
111
+ agent_id = os.getenv("AGENT_ID", str(uuid.uuid4()))
112
+ agent_type = os.getenv("AGENT_TYPE", "EchoAgent")
113
+ config = json.loads(os.getenv("AGENT_CONFIG", "{}"))
114
+
115
+ echo_agent = EchoAgent(agent_id=agent_id, agent_type=agent_type, config=config)
116
+ await echo_agent.initialize()
117
+
118
+ @app.on_event("shutdown")
119
+ async def shutdown_event():
120
+ if echo_agent:
121
+ await echo_agent.shutdown()
122
+
123
+ @app.post("/process_subtask")
124
+ async def process_subtask_endpoint(subtask_data: Dict[str, Any]):
125
+ if not echo_agent:
126
+ raise HTTPException(status_code=500, detail="Agent not initialized.")
127
+ return await echo_agent.process_subtask(subtask_data)
128
+
129
+ @app.get("/health")
130
+ async def health_check():
131
+ return {"status": "ok", "agent_id": echo_agent.id if echo_agent else "N/A"}
132
+
133
+ if __name__ == "__main__":
134
+ uvicorn.run(app, host="0.0.0.0", port=8000)
agents/mcp_agent_interface.py CHANGED
@@ -1,32 +1,39 @@
1
  from abc import ABC, abstractmethod
2
- from typing import Any, Dict
3
 
4
- class MCPAgentService(ABC):
5
  """
6
- MCP Agent 服务的抽象基类。
7
- 所有具体的 MCP Agent 服务都应继承此接口并实现其抽象方法。
8
  """
 
 
 
 
9
 
10
  @abstractmethod
11
- async def execute(self, subtask: Dict[str, Any]) -> Dict[str, Any]:
12
  """
13
- 执行子任务的方法。
14
 
15
  Args:
16
- subtask: 包含子任务描述和所需参数的字典。
17
 
18
  Returns:
19
- 包含子任务执行结果的字典。
20
  """
21
  pass
22
 
23
  @abstractmethod
24
- def get_capabilities(self) -> Dict[str, Any]:
25
  """
26
- 返回 Agent 服务的能力描述。
27
- 这将帮助 RootAgent 智能地调度任务。
28
-
29
- Returns:
30
- 包含 Agent 能力描述的字典。
 
 
 
31
  """
32
  pass
 
1
  from abc import ABC, abstractmethod
2
+ from typing import Dict, Any
3
 
4
+ class MCPAgent(ABC):
5
  """
6
+ 抽象基类,定义 MCP Agent 的通用接口。
7
+ 所有具体的 Agent 都应继承此接口并实现其方法。
8
  """
9
+ def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]):
10
+ self.id = agent_id
11
+ self.agent_type = agent_type
12
+ self.config = config
13
 
14
  @abstractmethod
15
+ async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]:
16
  """
17
+ 处理一个子任务。
18
 
19
  Args:
20
+ subtask_data: 包含子任务详细信息的字典。
21
 
22
  Returns:
23
+ 包含子任务处理结果的字典。
24
  """
25
  pass
26
 
27
  @abstractmethod
28
+ async def initialize(self):
29
  """
30
+ Agent 启动时的初始化逻辑,例如加载长期记忆、连接外部服务等。
31
+ """
32
+ pass
33
+
34
+ @abstractmethod
35
+ async def shutdown(self):
36
+ """
37
+ Agent 关闭时的清理逻辑,例如保存短期记忆、断开连接等。
38
  """
39
  pass
agents/requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ fastapi
2
+ uvicorn[standard]
3
+ redis
agents/root_agent.py CHANGED
@@ -1,241 +1,181 @@
1
- # codes/agents/root_agent.py
2
  import asyncio
3
- import json # 导入 json 模块
4
- import os # 导入 os 模块
5
- from qwen_agent.agents import Assistant # 导入 Qwen-Agent 的 Assistant 类
6
- from qwen_agent.tools.base import BaseTool # 如果需要自定义工具,可以导入
 
 
 
 
 
 
 
 
 
 
7
 
8
  class RootAgent:
9
  def __init__(self):
 
 
 
 
 
 
 
 
10
  """
11
- RootAgent 的初始化方法。
12
- 负责设置 Agent 的基本配置,如记忆模块、规划模块等。
13
  """
14
- print("RootAgent 初始化...")
15
- # TODO: 初始化记忆模块
16
- # TODO: 初始化反思模块
17
- # TODO: 初始化 Agent 注册表
18
-
19
- # 初始化 Qwen-Agent Assistant 作为规划 Agent
20
- # 配置 LLM 模型为与 OpenAI 兼容的 Gemini 模型
21
- # 从 .env 文件加载环境变量
22
- from dotenv import load_dotenv
23
- load_dotenv(dotenv_path='.env')
24
-
25
- # 从环境变量中获取 OpenAI 兼容模型的 API 密钥和基础 URL
26
- openai_api_key = os.getenv("OPENAI_API_KEY")
27
- openai_base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") # 默认使用 OpenAI 官方 API 地址
28
- model = os.getenv("MODEL", "")
29
-
30
- if not openai_api_key:
31
- print("警告: 未设置 OPENAI_API_KEY 环境变量。OpenAI 兼容模型可能无法正常工作。")
32
-
33
- self.planner_agent = Assistant(llm={
34
- 'model': model, # OpenAI 兼容模型
35
- 'api_key': openai_api_key,
36
- 'base_url': openai_base_url,
37
- 'model_type': 'oai' # 更正为 'oai' 以兼容 qwen-agent
38
- })
39
- print("Qwen-Agent Planner Assistant (OpenAI 兼容) 初始化完成。")
40
-
41
- async def process_task(self, task_description: str) -> dict:
42
  """
43
- 处理高层级任务的入口方法。
44
-
45
- Args:
46
- task_description: 任务的描述字符串。
47
-
48
- Returns:
49
- 包含任务处理结果的字典。
50
  """
51
- print(f"RootAgent 接收到任务: {task_description}")
52
-
53
- # 1. 任务接收与拆分 (规划)
54
- subtasks = await self._plan_task(task_description)
55
- print(f"任务已拆分为子任务: {subtasks}")
56
-
57
- # 2. 任务调度与编排
58
- # 简单的依赖管理:按顺序执行,假设依赖关系已在 _plan_task 中处理为线性顺序
59
- # 实际中需要更复杂的 DAG 调度器
60
- task_results = {}
61
- for subtask in subtasks:
62
- # 检查并等待依赖任务完成
63
- for dep_id in subtask.get("dependencies", []):
64
- if dep_id not in task_results or task_results[dep_id]["status"] != "completed":
65
- # 实际中这里需要更复杂的等待机制或错误处理
66
- print(f"等待子任务 {dep_id} 完成才能执行 {subtask['id']}")
67
- # 暂时跳过,实际应阻塞或重新调度
68
- task_results[subtask["id"]] = {"subtask": subtask, "status": "skipped", "reason": f"Dependency {dep_id} not completed"}
69
- break
70
- else:
71
- print(f"开始调度子任务: {subtask['name']} (ID: {subtask['id']})")
72
- agent_service = await self._dispatch_subtask(subtask)
73
- if agent_service:
74
- # 假设 agent_service.execute 返回一个包含结果和状态的字典
75
- subtask_result = await agent_service.execute(subtask)
76
- task_results[subtask["id"]] = subtask_result
77
- else:
78
- task_results[subtask["id"]] = {"subtask": subtask, "status": "failed", "reason": "No suitable agent found"}
79
-
80
- print(f"子任务 {subtask['id']} 结果: {task_results.get(subtask['id'], 'N/A')}")
81
-
82
- # 3. 结果聚合
83
- results_list = list(task_results.values())
84
- final_result = await self._aggregate_results(results_list)
85
-
86
- # 4. 反思与学习 (可选)
87
- await self._reflect_on_execution(task_description, subtasks, results_list, final_result)
88
-
89
- return final_result
90
-
91
- async def _plan_task(self, task_description: str) -> list[dict]:
92
  """
93
- 根据任务描述进行规划,拆分为子任务,并建立依赖关系。
94
-
95
- Args:
96
- task_description: 任务的描述字符串。
97
-
98
- Returns:
99
- 一个包含子任务及其依赖关系的列表。每个子任务是一个字典,
100
- 包含 'id', 'name', 'description', 'dependencies' 等字段。
101
  """
102
- print(f"RootAgent 正在使用 Qwen-Agent 规划任务: {task_description}")
103
-
104
- # 构造 Qwen-Agent 的输入提示
105
- # Qwen-Agent 的 run 方法通常期望一个消息列表作为输入
106
- messages = [
107
- {
108
- 'role': 'user',
109
- 'content': f"""请将以下高层级任务分解为一系列可执行的子任务,并以 JSON 数组的格式返回。
110
- 每个子任务对象应包含以下字段:
111
- - 'id': 唯一的子任务ID (字符串)
112
- - 'name': 子任务的名称 (字符串)
113
- - 'description': 子任务的详细描述 (字符串)
114
- - 'dependencies': 该子任务依赖的其他子任务ID列表 (字符串数组,如果无依赖则为空数组)
115
- - 'required_agent_type': 执行该子任务所需的 Agent 类型 (字符串,例如:DataCollectorAgent, DataAnalyzerAgent, ReportGeneratorAgent)
116
-
117
- 任务描述:{task_description}
118
-
119
- 请确保输出是有效的 JSON 格式,并且只包含 JSON 数组,不包含任何额外的文本或解释。
120
- """
121
- }
122
- ]
123
-
124
- # 调用 Qwen-Agent 进行规划
125
- # Qwen-Agent 的 run 方法通常返回一个包含 Agent 思考过程和最终结果的字典
126
- # 我们需要从结果中提取规划的子任务
127
- # 封装同步生成器迭代,并使用 asyncio.to_thread 包装
128
- def run_sync_planner():
129
- last_item = None
130
- # 迭代生成器,获取最终结果
131
- # 将 messages 传递给 run 方法
132
- for item in self.planner_agent.run(messages):
133
- last_item = item
134
- return last_item
135
-
136
- response = await asyncio.to_thread(run_sync_planner)
137
-
138
- subtasks = []
139
- try:
140
- # 检查 response 是否为列表,并且包含至少一个字典
141
- if isinstance(response, list) and len(response) > 0 and isinstance(response[0], dict):
142
- # 假设实际的 JSON 字符串在第一个字典的 'content' 键中
143
- content_str = response[0].get('content', '')
144
-
145
- # 提取 Markdown 代码块中的 JSON 字符串
146
- # 查找 '```json\n' 和 '\n```' 之间的内容
147
- json_start = content_str.find('```json\n')
148
- json_end = content_str.rfind('\n```')
149
-
150
- if json_start != -1 and json_end != -1 and json_start < json_end:
151
- json_str = content_str[json_start + len('```json\n'):json_end]
152
- parsed_output = json.loads(json_str)
153
- if isinstance(parsed_output, list):
154
- subtasks = parsed_output
155
- else:
156
- print(f"Qwen-Agent 提取的 JSON 不是列表格式: {parsed_output}")
157
- else:
158
- print(f"Qwen-Agent 返回的 content 字段不包含预期的 JSON 代码块: {content_str}")
159
- else:
160
- print(f"Qwen-Agent 返回非预期格式的响应: {response}")
161
- except json.JSONDecodeError as e:
162
- print(f"解析 Qwen-Agent 规划结果时发生 JSON ��误: {e}")
163
- print(f"Qwen-Agent 原始响应: {response}")
164
- except Exception as e:
165
- print(f"解析 Qwen-Agent 规划结果时发生未知错误: {e}")
166
- print(f"Qwen-Agent 原始响应: {response}")
167
-
168
- # 如果 Qwen-Agent 未能生成有效规划,返回一个默认的模拟规划
169
- if not subtasks:
170
- print("Qwen-Agent 未能生成有效规划,返回模拟子任务列表。")
171
- subtasks = [
172
- {
173
- "id": "task_001",
174
- "name": "市场数据收集",
175
- "description": f"收集关于 {task_description} 的最新市场数据",
176
- "dependencies": [],
177
- "required_agent_type": "DataCollectorAgent"
178
- },
179
- {
180
- "id": "task_002",
181
- "name": "数据分析",
182
- "description": "对收集到的市场数据进行分析,识别关键趋势",
183
- "dependencies": ["task_001"],
184
- "required_agent_type": "DataAnalyzerAgent"
185
- },
186
- {
187
- "id": "task_003",
188
- "name": "报告生成",
189
- "description": "根据数据分析结果生成一份简要报告",
190
- "dependencies": ["task_002"],
191
- "required_agent_type": "ReportGeneratorAgent"
192
- }
193
- ]
194
-
195
- return subtasks
196
-
197
- async def _dispatch_subtask(self, subtask: dict):
198
  """
199
- 调度子任务到合适的 MCP Agent 服务。
200
  """
201
- # 这是一个占位符实现,未来将从注册表选择 Agent 或请求 Agent Manager 创建
202
- print(f"调度子任务: {subtask['name']}")
203
- # TODO: 实现 Agent 发现和调度逻辑
204
- # TODO: 如果没有合适的 Agent,请求 Agent Manager 创建
205
-
206
- # 模拟一个 Agent 服务调用
207
- class MockAgentService:
208
- async def execute(self, subtask_data: dict) -> dict:
209
- print(f"MockAgentService 正在执行 {subtask_data['name']}")
210
- await asyncio.sleep(1) # 模拟耗时操作
211
- return {"subtask": subtask_data, "status": "completed", "output": f"已完成 {subtask_data['name']}"}
212
-
213
- # 暂时返回一个 Mock Agent 服务实例
214
- return MockAgentService()
215
-
216
- async def _aggregate_results(self, results: list) -> dict:
217
  """
218
- 聚合所有子任务的结果。
219
  """
220
- print(f"聚合子任务结果: {results}")
221
- # 这是一个占位符实现
222
- return {"status": "completed", "final_output": "所有子任务已处理", "details": results}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
 
224
- async def _reflect_on_execution(self, original_task: str, subtasks: list, results: list, final_result: dict):
225
  """
226
- 对任务执行过程进行反思和学习。
227
  """
228
- print("RootAgent 正在反思执行过程...")
229
- # TODO: 实现反思逻辑,例如评估成功率、识别瓶颈、更新长期记忆
230
- pass
231
-
232
- # 示例用法 (仅用于测试 RootAgent 模块本身,实际由 codes/app.py 调用)
233
- if __name__ == "__main__":
234
- import asyncio
235
-
236
- async def main():
237
- root_agent = RootAgent()
238
- result = await root_agent.process_task("分析市场数据并生成报告")
239
- print(f"\n最终任务结果: {result}")
240
-
241
- asyncio.run(main())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Any, List, Optional
2
  import asyncio
3
+ import os
4
+ import json
5
+ import httpx
6
+ import redis
7
+
8
+ from .agent_manager_client import AgentManagerClient, AgentInfo, CreateAgentRequest
9
+ from .mcp_agent_interface import MCPAgent # 导入 MCP Agent 接口
10
+
11
+ # Redis 配置
12
+ REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
13
+ REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
14
+ REDIS_DB = int(os.getenv("REDIS_DB", 0))
15
+ AGENT_CHANNEL = "agent_discovery_channel"
16
+ AGENT_KEY_PREFIX = "agent:"
17
 
18
  class RootAgent:
19
  def __init__(self):
20
+ AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
21
+ self.agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
22
+ self.active_agents: Dict[str, AgentInfo] = {} # 内存缓存活跃 Agent
23
+ self.redis_client: Optional[redis.Redis] = None
24
+ self.redis_pubsub: Optional[redis.client.PubSub] = None
25
+ self.discovery_task: Optional[asyncio.Task] = None
26
+
27
+ async def initialize(self):
28
  """
29
+ RootAgent 启动时的初始化逻辑,连接 Redis 并启动 Agent 发现。
 
30
  """
31
+ self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
32
+ try:
33
+ self.redis_client.ping()
34
+ print("RootAgent connected to Redis successfully!")
35
+ except redis.exceptions.ConnectionError as e:
36
+ print(f"RootAgent could not connect to Redis: {e}")
37
+ # 可以在这里选择退出或以降级模式运行
38
+
39
+ self.redis_pubsub = self.redis_client.pubsub()
40
+ self.redis_pubsub.subscribe(AGENT_CHANNEL)
41
+ self.discovery_task = asyncio.create_task(self._agent_discovery_listener())
42
+
43
+ # Redis 加载所有已知的 Agent
44
+ await self._load_existing_agents()
45
+ print("RootAgent initialized.")
46
+
47
+ async def shutdown(self):
 
 
 
 
 
 
 
 
 
 
 
48
  """
49
+ RootAgent 关闭时的清理逻辑。
 
 
 
 
 
 
50
  """
51
+ if self.discovery_task:
52
+ self.discovery_task.cancel()
53
+ try:
54
+ await self.discovery_task
55
+ except asyncio.CancelledError:
56
+ pass
57
+ if self.redis_pubsub:
58
+ self.redis_pubsub.unsubscribe(AGENT_CHANNEL)
59
+ if self.redis_client:
60
+ self.redis_client.close()
61
+ print("RootAgent shutdown.")
62
+
63
+ async def _agent_discovery_listener(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  """
65
+ 监听 Redis 频道,实时更新活跃 Agent 列表。
 
 
 
 
 
 
 
66
  """
67
+ if not self.redis_pubsub:
68
+ return
69
+
70
+ while True:
71
+ message = self.redis_pubsub.get_message(ignore_subscribe_messages=True)
72
+ if message:
73
+ try:
74
+ data = json.loads(message['data'].decode('utf-8'))
75
+ agent_id = data.get("id")
76
+ event_type = data.get("event_type")
77
+
78
+ if agent_id:
79
+ if event_type == "HEARTBEAT" or event_type == "ONLINE":
80
+ # Redis 获取最新的 Agent 信息
81
+ agent_data = self.redis_client.hgetall(f"{AGENT_KEY_PREFIX}{agent_id}")
82
+ if agent_data:
83
+ agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()})
84
+ self.active_agents[agent_id] = agent_info
85
+ # print(f"RootAgent discovered/updated agent: {agent_id} ({agent_info.agent_type})")
86
+ else:
87
+ print(f"RootAgent received {event_type} for {agent_id}, but no data in Redis.")
88
+ elif event_type == "OFFLINE":
89
+ if agent_id in self.active_agents:
90
+ del self.active_agents[agent_id]
91
+ print(f"RootAgent removed offline agent: {agent_id}")
92
+ except json.JSONDecodeError:
93
+ print(f"RootAgent received invalid JSON message from Redis: {message['data']}")
94
+ except Exception as e:
95
+ print(f"RootAgent error processing Redis message: {e}")
96
+ await asyncio.sleep(0.1)
97
+
98
+ async def _load_existing_agents(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
  """
100
+ Redis 加载所有已知的 Agent 到内存缓存。
101
  """
102
+ for key in self.redis_client.keys(f"{AGENT_KEY_PREFIX}*"):
103
+ agent_data = self.redis_client.hgetall(key)
104
+ if agent_data:
105
+ agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()})
106
+ self.active_agents[agent_info.id] = agent_info
107
+ print(f"RootAgent loaded existing agent: {agent_info.id} ({agent_info.agent_type})")
108
+
109
+ async def _get_or_create_agent(self, agent_type: str, image_name: str = "echo-agent:latest") -> AgentInfo:
 
 
 
 
 
 
 
 
110
  """
111
+ 尝试从缓存中获取一个可用 Agent,如果不存在则请求 Agent Manager 创建。
112
  """
113
+ # 1. 检查内存缓存中是否有可用 Agent
114
+ for agent_id, agent_info in self.active_agents.items():
115
+ if agent_info.agent_type == agent_type and agent_info.status == "running":
116
+ print(f"Found existing agent {agent_id} of type {agent_type}.")
117
+ return agent_info
118
+
119
+ # 2. 如果没有,请求 Agent Manager 创建新 Agent
120
+ print(f"No active agent of type {agent_type} found. Requesting Agent Manager to create one.")
121
+ create_request = CreateAgentRequest(
122
+ agent_type=agent_type,
123
+ image_name=image_name,
124
+ env_vars={"REDIS_HOST": REDIS_HOST, "REDIS_PORT": str(REDIS_PORT), "REDIS_DB": str(REDIS_DB)},
125
+ config={"some_initial_config": "value"}
126
+ )
127
+ try:
128
+ new_agent_info = await self.agent_manager_client.create_agent(create_request)
129
+ self.active_agents[new_agent_info.id] = new_agent_info # 添加到缓存
130
+ print(f"Agent Manager created new agent: {new_agent_info.id} ({new_agent_info.agent_type})")
131
+ return new_agent_info
132
+ except httpx.HTTPStatusError as e:
133
+ print(f"Failed to create agent via Agent Manager: {e.response.text}")
134
+ raise RuntimeError(f"Failed to create agent {agent_type}: {e}")
135
+ except Exception as e:
136
+ print(f"An unexpected error occurred while creating agent: {e}")
137
+ raise RuntimeError(f"Failed to create agent {agent_type}: {e}")
138
 
139
+ async def process_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
140
  """
141
+ RootAgent 的核心任务处理逻辑:任务拆分、调度、结果聚合。
142
  """
143
+ print(f"RootAgent received task: {task_data}")
144
+
145
+ # 示例:简单任务拆分和调度
146
+ # 假设 task_data 包含 "subtasks" 列表,每个子任务有 "agent_type" 和 "payload"
147
+ subtasks_definitions = task_data.get("subtasks", [])
148
+ if not subtasks_definitions:
149
+ return {"status": "failed", "error": "No subtasks defined in the task."}
150
+
151
+ results = []
152
+ for subtask_def in subtasks_definitions:
153
+ agent_type = subtask_def.get("agent_type")
154
+ payload = subtask_def.get("payload")
155
+
156
+ if not agent_type or not payload:
157
+ results.append({"status": "failed", "error": "Invalid subtask definition."})
158
+ continue
159
+
160
+ try:
161
+ # 获取或创建 Agent
162
+ agent_info = await self._get_or_create_agent(agent_type)
163
+
164
+ # 调用 Agent 的 MCP 服务
165
+ async with httpx.AsyncClient() as client:
166
+ response = await client.post(
167
+ f"{agent_info.mcp_endpoint}/process_subtask",
168
+ json=payload,
169
+ timeout=30.0 # 设定超时
170
+ )
171
+ response.raise_for_status()
172
+ subtask_result = response.json()
173
+ results.append({"agent_type": agent_type, "result": subtask_result})
174
+ except Exception as e:
175
+ results.append({"agent_type": agent_type, "status": "failed", "error": str(e)})
176
+ print(f"Error processing subtask with agent {agent_type}: {e}")
177
+
178
+ # 聚合结果
179
+ final_output = {"status": "completed", "aggregated_results": results}
180
+ print(f"RootAgent finished processing task. Results: {final_output}")
181
+ return final_output
app.py CHANGED
@@ -1,28 +1,38 @@
1
- import sys
 
 
2
  import os
3
 
4
- # 将当前文件所在的目录(即 root_agent 目录)添加到 Python 路径
5
- sys.path.append(os.path.dirname(__file__))
6
-
7
- from fastapi import FastAPI
8
- from agents.root_agent import RootAgent
9
 
10
  app = FastAPI()
11
- root_agent = RootAgent()
12
 
13
- @app.get("/")
14
- async def read_root():
15
- return {"Hello": "World!"}
 
 
16
 
17
- from pydantic import BaseModel
 
 
 
18
 
19
- class TaskRequest(BaseModel):
20
- task_description: str
 
21
 
22
  @app.post("/process_task")
23
- async def process_task_endpoint(request: TaskRequest):
24
  """
25
- 接收并处理高层级任务的 API 端点。
26
  """
27
- result = await root_agent.process_task(request.task_description)
28
- return result
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException, status
2
+ from typing import Dict, Any
3
+ import asyncio
4
  import os
5
 
6
+ from root_agent.agents.root_agent import RootAgent
 
 
 
 
7
 
8
  app = FastAPI()
9
+ root_agent_instance: Optional[RootAgent] = None
10
 
11
+ @app.on_event("startup")
12
+ async def startup_event():
13
+ global root_agent_instance
14
+ root_agent_instance = RootAgent()
15
+ await root_agent_instance.initialize()
16
 
17
+ @app.on_event("shutdown")
18
+ async def shutdown_event():
19
+ if root_agent_instance:
20
+ await root_agent_instance.shutdown()
21
 
22
+ @app.get("/")
23
+ async def read_root():
24
+ return {"message": "Worker service with RootAgent is running!"}
25
 
26
  @app.post("/process_task")
27
+ async def process_task_endpoint(task_data: Dict[str, Any]):
28
  """
29
+ 接收外部任务请求,并交由 RootAgent 处理。
30
  """
31
+ if not root_agent_instance:
32
+ raise HTTPException(status_code=500, detail="RootAgent not initialized.")
33
+
34
+ try:
35
+ result = await root_agent_instance.process_task(task_data)
36
+ return result
37
+ except Exception as e:
38
+ raise HTTPException(status_code=500, detail=f"Error processing task: {e}")