airsltd commited on
Commit
c202f7c
·
1 Parent(s): 98ca81a
.clinerules/基础指令.md CHANGED
@@ -11,6 +11,7 @@
11
  4. **避免不必要的确认:** 除非操作具有潜在影响(例如,删除文件、修改关键配置)或用户明确要求确认,否则避免不必要的确认步骤。
12
 
13
  # 参考链接
 
14
 
15
  # Python 环境管理
16
  使用 `conda` 作为环境管理工具。
 
11
  4. **避免不必要的确认:** 除非操作具有潜在影响(例如,删除文件、修改关键配置)或用户明确要求确认,否则避免不必要的确认步骤。
12
 
13
  # 参考链接
14
+ 1. [Qwen-Agent](https://qwen.readthedocs.io/zh-cn/latest/framework/qwen_agent.html)
15
 
16
  # Python 环境管理
17
  使用 `conda` 作为环境管理工具。
.gitignore CHANGED
@@ -1 +1,2 @@
1
  __pycache__/
 
 
1
  __pycache__/
2
+ *.env
agent_manager/app.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI
2
+ from pydantic import BaseModel
3
+ import uuid
4
+
5
+ app = FastAPI()
6
+
7
+ class CreateAgentRequest(BaseModel):
8
+ agent_type: str
9
+ config: dict = {}
10
+
11
+ class AgentInfo(BaseModel):
12
+ agent_id: str
13
+ agent_type: str
14
+ status: str
15
+ endpoint: str
16
+
17
+ # 模拟 Agent 注册表
18
+ active_agents = {}
19
+
20
+ @app.get("/")
21
+ async def read_root():
22
+ return {"message": "Agent Manager is running"}
23
+
24
+ @app.post("/create_agent", response_model=AgentInfo)
25
+ async def create_agent(request: CreateAgentRequest):
26
+ """
27
+ 动态创建 Agent 实例的 MCP 接口。
28
+ """
29
+ agent_id = str(uuid.uuid4())
30
+ # 模拟 Agent 启动逻辑
31
+ # 实际中会与 Docker/Kubernetes 等基础设施交互
32
+ endpoint = f"http://localhost:8000/agents/{agent_id}" # 占位符
33
+
34
+ agent_info = AgentInfo(
35
+ agent_id=agent_id,
36
+ agent_type=request.agent_type,
37
+ status="running",
38
+ endpoint=endpoint
39
+ )
40
+ active_agents[agent_id] = agent_info
41
+ print(f"Agent Manager 创建了 Agent: {agent_info}")
42
+ return agent_info
43
+
44
+ @app.get("/agents/{agent_id}", response_model=AgentInfo)
45
+ async def get_agent_info(agent_id: str):
46
+ """
47
+ 获取指定 Agent 实例的信息。
48
+ """
49
+ agent = active_agents.get(agent_id)
50
+ if not agent:
51
+ raise HTTPException(status_code=404, detail="Agent not found")
52
+ return agent
53
+
54
+ @app.delete("/agents/{agent_id}")
55
+ async def delete_agent(agent_id: str):
56
+ """
57
+ 销毁指定 Agent 实例。
58
+ """
59
+ if agent_id in active_agents:
60
+ del active_agents[agent_id]
61
+ print(f"Agent Manager 销毁了 Agent: {agent_id}")
62
+ return {"message": f"Agent {agent_id} deleted"}
63
+ raise HTTPException(status_code=404, detail="Agent not found")
64
+
65
+ # 示例用法 (仅用于测试 Agent Manager 模块本身)
66
+ if __name__ == "__main__":
67
+ import uvicorn
68
+ uvicorn.run(app, host="0.0.0.0", port=8000)
agent_manager/requirements.txt ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ fastapi
2
+ uvicorn
codes/.env.example ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ OPENAI_API_KEY=""
2
+ OPENAI_BASE_URL=""
3
+ MODEL="gemini-2.5-flash-preview-05-20"
codes/agents/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # This makes 'agents' a Python package.
codes/agents/mcp_agent_interface.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from abc import ABC, abstractmethod
2
+ from typing import Any, Dict
3
+
4
+ class MCPAgentService(ABC):
5
+ """
6
+ MCP Agent 服务的抽象基类。
7
+ 所有具体的 MCP Agent 服务都应继承此接口并实现其抽象方法。
8
+ """
9
+
10
+ @abstractmethod
11
+ async def execute(self, subtask: Dict[str, Any]) -> Dict[str, Any]:
12
+ """
13
+ 执行子任务的方法。
14
+
15
+ Args:
16
+ subtask: 包含子任务描述和所需参数的字典。
17
+
18
+ Returns:
19
+ 包含子任务执行结果的字典。
20
+ """
21
+ pass
22
+
23
+ @abstractmethod
24
+ def get_capabilities(self) -> Dict[str, Any]:
25
+ """
26
+ 返回 Agent 服务的能力描述。
27
+ 这将帮助 RootAgent 智能地调度任务。
28
+
29
+ Returns:
30
+ 包含 Agent 能力描述的字典。
31
+ """
32
+ pass
codes/agents/root_agent.py ADDED
@@ -0,0 +1,241 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # codes/agents/root_agent.py
2
+ import asyncio
3
+ import json # 导入 json 模块
4
+ import os # 导入 os 模块
5
+ from qwen_agent.agents import Assistant # 导入 Qwen-Agent 的 Assistant 类
6
+ from qwen_agent.tools.base import BaseTool # 如果需要自定义工具,可以导入
7
+
8
+ class RootAgent:
9
+ def __init__(self):
10
+ """
11
+ RootAgent 的初始化方法。
12
+ 负责设置 Agent 的基本配置,如记忆模块、规划模块等。
13
+ """
14
+ print("RootAgent 初始化...")
15
+ # TODO: 初始化记忆模块
16
+ # TODO: 初始化反思模块
17
+ # TODO: 初始化 Agent 注册表
18
+
19
+ # 初始化 Qwen-Agent 的 Assistant 作为规划 Agent
20
+ # 配置 LLM 模型为与 OpenAI 兼容的 Gemini 模型
21
+ # 从 .env 文件加载环境变量
22
+ from dotenv import load_dotenv
23
+ load_dotenv(dotenv_path='codes/.env')
24
+
25
+ # 从环境变量中获取 OpenAI 兼容模型的 API 密钥和基础 URL
26
+ openai_api_key = os.getenv("OPENAI_API_KEY")
27
+ openai_base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") # 默认使用 OpenAI 官方 API 地址
28
+ model = os.getenv("MODEL", "")
29
+
30
+ if not openai_api_key:
31
+ print("警告: 未设置 OPENAI_API_KEY 环境变量。OpenAI 兼容模型可能无法正常工作。")
32
+
33
+ self.planner_agent = Assistant(llm={
34
+ 'model': model, # OpenAI 兼容模型
35
+ 'api_key': openai_api_key,
36
+ 'base_url': openai_base_url,
37
+ 'model_type': 'oai' # 更正为 'oai' 以兼容 qwen-agent
38
+ })
39
+ print("Qwen-Agent Planner Assistant (OpenAI 兼容) 初始化完成。")
40
+
41
+ async def process_task(self, task_description: str) -> dict:
42
+ """
43
+ 处理高层级任务的入口方法。
44
+
45
+ Args:
46
+ task_description: 任务的描述字符串。
47
+
48
+ Returns:
49
+ 包含任务处理结果的字典。
50
+ """
51
+ print(f"RootAgent 接收到任务: {task_description}")
52
+
53
+ # 1. 任务接收与拆分 (规划)
54
+ subtasks = await self._plan_task(task_description)
55
+ print(f"任务已拆分为子任务: {subtasks}")
56
+
57
+ # 2. 任务调度与编排
58
+ # 简单的依赖管理:按顺序执行,假设依赖关系已在 _plan_task 中处理为线性顺序
59
+ # 实际中需要更复杂的 DAG 调度器
60
+ task_results = {}
61
+ for subtask in subtasks:
62
+ # 检查并等待依赖任务完成
63
+ for dep_id in subtask.get("dependencies", []):
64
+ if dep_id not in task_results or task_results[dep_id]["status"] != "completed":
65
+ # 实际中这里需要更复杂的等待机制或错误处理
66
+ print(f"等待子任务 {dep_id} 完成才能执行 {subtask['id']}")
67
+ # 暂时跳过,实际应阻塞或重新调度
68
+ task_results[subtask["id"]] = {"subtask": subtask, "status": "skipped", "reason": f"Dependency {dep_id} not completed"}
69
+ break
70
+ else:
71
+ print(f"开始调度子任务: {subtask['name']} (ID: {subtask['id']})")
72
+ agent_service = await self._dispatch_subtask(subtask)
73
+ if agent_service:
74
+ # 假设 agent_service.execute 返回一个包含结果和状态的字典
75
+ subtask_result = await agent_service.execute(subtask)
76
+ task_results[subtask["id"]] = subtask_result
77
+ else:
78
+ task_results[subtask["id"]] = {"subtask": subtask, "status": "failed", "reason": "No suitable agent found"}
79
+
80
+ print(f"子任务 {subtask['id']} 结果: {task_results.get(subtask['id'], 'N/A')}")
81
+
82
+ # 3. 结果聚合
83
+ results_list = list(task_results.values())
84
+ final_result = await self._aggregate_results(results_list)
85
+
86
+ # 4. 反思与学习 (可选)
87
+ await self._reflect_on_execution(task_description, subtasks, results_list, final_result)
88
+
89
+ return final_result
90
+
91
+ async def _plan_task(self, task_description: str) -> list[dict]:
92
+ """
93
+ 根据任务描述进行规划,拆分为子任务,并建立依赖关系。
94
+
95
+ Args:
96
+ task_description: 任务的描述字符串。
97
+
98
+ Returns:
99
+ 一个包含子任务及其依赖关系的列表。每个子任务是一个字典,
100
+ 包含 'id', 'name', 'description', 'dependencies' 等字段。
101
+ """
102
+ print(f"RootAgent 正在使用 Qwen-Agent 规划任务: {task_description}")
103
+
104
+ # 构造 Qwen-Agent 的输入提示
105
+ # Qwen-Agent 的 run 方法通常期望一个消息列表作为输入
106
+ messages = [
107
+ {
108
+ 'role': 'user',
109
+ 'content': f"""请将以下高层级任务分解为一系列可执行的子任务,并以 JSON 数组的格式返回。
110
+ 每个子任务对象应包含以下字段:
111
+ - 'id': 唯一的子任务ID (字符串)
112
+ - 'name': 子任务的名称 (字符串)
113
+ - 'description': 子任务的详细描述 (字符串)
114
+ - 'dependencies': 该子任务依赖的其他子任务ID列表 (字符串数组,如果无依赖则为空数组)
115
+ - 'required_agent_type': 执行该子任务所需的 Agent 类型 (字符串,例如:DataCollectorAgent, DataAnalyzerAgent, ReportGeneratorAgent)
116
+
117
+ 任务描述:{task_description}
118
+
119
+ 请确保输出是有效的 JSON 格式,并且只包含 JSON 数组,不包含任何额外的文本或解释。
120
+ """
121
+ }
122
+ ]
123
+
124
+ # 调用 Qwen-Agent 进行规划
125
+ # Qwen-Agent 的 run 方法通常返回一个包含 Agent 思考过程和最终结果的字典
126
+ # 我们需要从结果中提取规划的子任务
127
+ # 封装同步生成器迭代,并使用 asyncio.to_thread 包装
128
+ def run_sync_planner():
129
+ last_item = None
130
+ # 迭代生成器,获取最终结果
131
+ # 将 messages 传递给 run 方法
132
+ for item in self.planner_agent.run(messages):
133
+ last_item = item
134
+ return last_item
135
+
136
+ response = await asyncio.to_thread(run_sync_planner)
137
+
138
+ subtasks = []
139
+ try:
140
+ # 检查 response 是否为列表,并且包含至少一个字典
141
+ if isinstance(response, list) and len(response) > 0 and isinstance(response[0], dict):
142
+ # 假设实际的 JSON 字符串在第一个字典的 'content' 键中
143
+ content_str = response[0].get('content', '')
144
+
145
+ # 提取 Markdown 代码块中的 JSON 字符串
146
+ # 查找 '```json\n' 和 '\n```' 之间的内容
147
+ json_start = content_str.find('```json\n')
148
+ json_end = content_str.rfind('\n```')
149
+
150
+ if json_start != -1 and json_end != -1 and json_start < json_end:
151
+ json_str = content_str[json_start + len('```json\n'):json_end]
152
+ parsed_output = json.loads(json_str)
153
+ if isinstance(parsed_output, list):
154
+ subtasks = parsed_output
155
+ else:
156
+ print(f"Qwen-Agent 提取的 JSON 不是列表格式: {parsed_output}")
157
+ else:
158
+ print(f"Qwen-Agent 返回的 content 字段不包含预期的 JSON 代码块: {content_str}")
159
+ else:
160
+ print(f"Qwen-Agent 返回非预期格式的响应: {response}")
161
+ except json.JSONDecodeError as e:
162
+ print(f"解析 Qwen-Agent 规划结果时发生 JSON 错误: {e}")
163
+ print(f"Qwen-Agent 原始响应: {response}")
164
+ except Exception as e:
165
+ print(f"解析 Qwen-Agent 规划结果时发生未知错误: {e}")
166
+ print(f"Qwen-Agent 原始响应: {response}")
167
+
168
+ # 如果 Qwen-Agent 未能生成有效规划,返回一个默认的模拟规划
169
+ if not subtasks:
170
+ print("Qwen-Agent 未能生成有效规划,返回模拟子任务列表。")
171
+ subtasks = [
172
+ {
173
+ "id": "task_001",
174
+ "name": "市场数据收集",
175
+ "description": f"收集关于 {task_description} 的最新市场数据",
176
+ "dependencies": [],
177
+ "required_agent_type": "DataCollectorAgent"
178
+ },
179
+ {
180
+ "id": "task_002",
181
+ "name": "数据分析",
182
+ "description": "对收集到的市场数据进行分析,识别关键趋势",
183
+ "dependencies": ["task_001"],
184
+ "required_agent_type": "DataAnalyzerAgent"
185
+ },
186
+ {
187
+ "id": "task_003",
188
+ "name": "报告生成",
189
+ "description": "根据数据分析结果生成一份简要报告",
190
+ "dependencies": ["task_002"],
191
+ "required_agent_type": "ReportGeneratorAgent"
192
+ }
193
+ ]
194
+
195
+ return subtasks
196
+
197
+ async def _dispatch_subtask(self, subtask: dict):
198
+ """
199
+ 调度子任务到合适的 MCP Agent 服务。
200
+ """
201
+ # 这是一个占位符实现,未来将从注册表选择 Agent 或请求 Agent Manager 创建
202
+ print(f"调度子任务: {subtask['name']}")
203
+ # TODO: 实现 Agent 发现和调度逻辑
204
+ # TODO: 如果没有合适的 Agent,请求 Agent Manager 创建
205
+
206
+ # 模拟一个 Agent 服务调用
207
+ class MockAgentService:
208
+ async def execute(self, subtask_data: dict) -> dict:
209
+ print(f"MockAgentService 正在执行 {subtask_data['name']}")
210
+ await asyncio.sleep(1) # 模拟耗时操作
211
+ return {"subtask": subtask_data, "status": "completed", "output": f"已完成 {subtask_data['name']}"}
212
+
213
+ # 暂时返回一个 Mock Agent 服务实例
214
+ return MockAgentService()
215
+
216
+ async def _aggregate_results(self, results: list) -> dict:
217
+ """
218
+ 聚合所有子任务的结果。
219
+ """
220
+ print(f"聚合子任务结果: {results}")
221
+ # 这是一个占位符实现
222
+ return {"status": "completed", "final_output": "所有子任务已处理", "details": results}
223
+
224
+ async def _reflect_on_execution(self, original_task: str, subtasks: list, results: list, final_result: dict):
225
+ """
226
+ 对任务执行过程进行反思和学习。
227
+ """
228
+ print("RootAgent 正在反思执行过程...")
229
+ # TODO: 实现反思逻辑,例如评估成功率、识别瓶颈、更新长期记忆
230
+ pass
231
+
232
+ # 示例用法 (仅用于测试 RootAgent 模块本身,实际由 codes/app.py 调用)
233
+ if __name__ == "__main__":
234
+ import asyncio
235
+
236
+ async def main():
237
+ root_agent = RootAgent()
238
+ result = await root_agent.process_task("分析市场数据并生成报告")
239
+ print(f"\n最终任务结果: {result}")
240
+
241
+ asyncio.run(main())
codes/app.py CHANGED
@@ -1,7 +1,22 @@
1
  from fastapi import FastAPI
 
2
 
3
  app = FastAPI()
 
4
 
5
  @app.get("/")
6
- def greet_json():
7
- return {"Hello": "World!"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from fastapi import FastAPI
2
+ from codes.agents.root_agent import RootAgent
3
 
4
  app = FastAPI()
5
+ root_agent = RootAgent()
6
 
7
  @app.get("/")
8
+ async def read_root():
9
+ return {"Hello": "World!"}
10
+
11
+ from pydantic import BaseModel
12
+
13
+ class TaskRequest(BaseModel):
14
+ task_description: str
15
+
16
+ @app.post("/process_task")
17
+ async def process_task_endpoint(request: TaskRequest):
18
+ """
19
+ 接收并处理高层级任务的 API 端点。
20
+ """
21
+ result = await root_agent.process_task(request.task_description)
22
+ return result
docs/worker_agent_design.md ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Worker 与 Agent 系统设计方案
2
+
3
+ ## 1. 系统概述
4
+
5
+ 本设计方案旨在构建一个灵活、可扩展、具备**规划、反思和记忆能力**的多 Agent 系统,其中 `worker` 作为核心服务,负责接收外部请求并协调多个 `Agent` 的工作。系统将包含一个主 `Agent`(`RootAgent`)和多个以 Model Context Protocol (MCP) 服务形式存在的子 `Agent`,并引入一个独立的 `Agent Manager` 来实现 `Agent` 的动态创建和管理。
6
+
7
+ ## 2. 核心组件与职责
8
+
9
+ ### 2.1 Worker (FastAPI 应用)
10
+
11
+ * **职责:**
12
+ * 作为系统的入口点,接收外部 HTTP 请求。
13
+ * 将请求转发给内部的 `RootAgent` 进行处理。
14
+ * 将 `RootAgent` 返回的结果作为 HTTP 响应返回给客户端。
15
+ * **部署:** 与 `RootAgent` 模块部署在同一个进程中。
16
+
17
+ ### 2.2 RootAgent (Python 模块)
18
+
19
+ * **职责:**
20
+ * **任务接收与拆分 (规划):** 接收来自 `worker` 的高层级任务,并将其分解为一系列更小、更具体的子任务。这可能涉及复杂的逻辑,如规则引擎或 LLM 辅助的任务理解和拆解。**引入规划机制,能够对任务进行多步骤分解和依赖管理,例如通过构建任务图(DAG)来表示任务流程和依赖关系。**
21
+ * **任务调度与编排:** 维护一个当前可用 MCP `Agent` 服务的注册表(或通过服务发现机制获取)。根据子任务的类型和所需能力,从注册表中选择最合适的 `Agent` 进行调用。**支持复杂工作流编排(如 DAG 或顺序工作流),协调多个 Agent 的执行顺序和数据流。**
22
+ * **反思与学习:** **在任务执行过程中或完成后,对执行结果进行评估和反思,从错误中学习,优化未来的决策和任务分解策略。这包括收集执行反馈、识别成功模式和失败原因,并将学习成果应用于改进规划策略和更新 Agent 能力模型。**
23
+ * **记忆管理:** **维护短期记忆(如当前对话历史、任务状态)和长期记忆(如 Agent 能力知识、历史任务经验),以促进未来的行动和决策。可以考虑引入独立的记忆存储服务来持久化和检索这些记忆。**
24
+ * **记忆存储方案:** 考虑使用 `memory-bank/` 目录下的文件作为初步的记忆存储,或者集成一个简单的本地数据库(如 SQLite)来持久化记忆。
25
+ * **短期记忆:** 在 `RootAgent` 中实现一个机制,用于存储和检索当前任务的上下文信息、中间结果和对话历史。
26
+ * **长期记忆:** 实现一个机制,用于存储和检索 Agent 能力知识、历史任务经验和优化策略。这可能涉及将成功的任务规划和执行流程作为“经验”存储起来。
27
+ * **反思机制:** 定义反思的触发条件(例如,任务成功完成、任务失败、达到特定里程碑)。在 `RootAgent` 中实现一个 `_reflect` 方法,利用 LLM 对任务执行过程和结果进行分析,识别成功模式和失败原因。
28
+ * **学习与优化:** 将反思的结果用于更新 `RootAgent` 的规划策略,例如调整子任务拆分的方式、选择 Agent 的优先级等。
29
+ * **触发动态创建:** 如果在注册表中找不到能够处理某个子任务的现有 `Agent`,`RootAgent` 将向 `Agent Manager` 发送请求,要求创建特定类型的 `Agent`。
30
+ * **集成 `Agent Manager` 客户端:** 在 `RootAgent` 中添加一个 `AgentManagerClient` 类,用于通过 HTTP/MCP 协议与 `Agent Manager` 交互,发送创建/销毁 Agent 的请求。
31
+ * **更新 `_dispatch_subtask` 方法:** 根据 `qwen-agent` 规划出的子任务类型,首先检查 `RootAgent` 是否已经缓存了该类型 Agent 的可用实例。如果不存在或不可用,则通过 `AgentManagerClient` 向 `Agent Manager` 发送 `create_agent` 请求,获取新创建 Agent 的端点信息。使用获取到的端点信息,通过 MCP 协议调用选定的 Agent 执行子任务。实现适当的错误处理和重试机制,以应对 Agent 启动失败或调用超时等情况。
32
+ * **Agent 实例缓存:** `RootAgent` 可以维护一个活跃 Agent 实例的缓存,以避免频繁地向 `Agent Manager` 请求创建 Agent。
33
+ * **结果聚合:** 协调多个 MCP `Agent` 服务的调用顺序和数据流,并聚合它们的返回结果。
34
+ * **定义统一结果结构:** 确定所有 MCP Agent 返回结果的统一数据结构,例如包含 `status`、`output` 和 `error` 字段。
35
+ * **更新 `_aggregate_results` 方法:** 修改 `_aggregate_results` 方法,使其能够处理来自实际 MCP Agent 服务的复杂结果。这可能涉及根据子任务的依赖关系和类型,对结果进行合并、转换或总结。
36
+ * **实现形式:** 作为 `codes/app.py` 导入的一个独立 Python 模块(例如 `codes/agents/root_agent.py`)实现。
37
+ * **部署:** 与 FastAPI 应用部署在同一个进程中。
38
+
39
+ ### 2.3 MCP Agent 服务 (��立进程/服务)
40
+
41
+ * **职责:**
42
+ * 每个 MCP `Agent` 服务专注于一个特定的任务或功能(例如,数据处理、模型推理、外部 API 调用、代码生成等)。
43
+ * 通过 MCP 协议暴露接口,供 `RootAgent` 调用。
44
+ * 执行具体的业务逻辑或智能决策。
45
+ * **实现形式:** 独立的应用程序,通过 MCP 协议对外提供服务。
46
+ * **通用 Agent 接口:** 在 `codes/agents/mcp_agent_interface.py` 文件中定义一个抽象基类或接口,包含 `process_subtask` 等方法,所有具体的 MCP Agent 都将继承并实现此接口。
47
+ * **示例 `EchoAgent`:** 实现一个简单的 `EchoAgent` 作为第一个具体 Agent,它接收一个消息并返回该消息,用于验证 `Agent Manager` 和 `RootAgent` 之间的通信。
48
+ * **Agent 容器化:** 为每个 Agent 创建 Dockerfile,使其能够被 `Agent Manager` 动态部署。
49
+ * **Agent 配置:** 确保 Agent 能够通过环境变量或 MCP 请求参数接收必要的配置信息。
50
+ * **部署:** 根据 `Agent Manager` 的指令动态创建和部署,通常作为独立的 Docker 容器或其他微服务。
51
+
52
+ ### 2.4 Agent Manager (MCP 服务)
53
+
54
+ * **职责:**
55
+ * **动态 `Agent` 创建:** 接收 `RootAgent` 发出的 `Agent` 创建请求,并通过抽象层与底层基础设施(如 Docker、Kubernetes、云函数服务等)交互,动态地创建、配置和启动新的 MCP `Agent` 实例。**该抽象层允许 `Agent Manager` 适配不同的部署环境。**
56
+ * **`CreateAgentRequest` 完善:** 扩展请求模型,增加 `image_name` (Docker 镜像名称)、`env_vars` (环境变量字典) 和 `resource_limits` (资源限制,如 CPU、内存) 等字段,以便更灵活地配置要创建的 Agent。
57
+ * **`AgentDeployer` 抽象:** 定义一个 `AgentDeployer` 接口或抽象类,包含 `deploy_agent` 和 `destroy_agent` 等方法,用于封装与不同底层基础设施交互的逻辑。
58
+ * **Docker 集成(初步实现):** 实现一个 `DockerAgentDeployer` 类,使用 `docker-py` 库与 Docker Daemon 交互,根据 `CreateAgentRequest` 中的信息动态启动和停止 Docker 容器。
59
+ * **生命周期管理:** 管理动态创建 `Agent` 的生命周期,包括启动、停止、监控其健康状态,并在不再需要时进行销毁。
60
+ * **Agent 注册与健康检查:** 在 Agent 启动后,`Agent Manager` 将获取其实际运行的 IP 地址和端口,并可以利用 Docker 网络或服务发现机制。同时,实现简单的健康检查机制,定期检查 Agent 容器的运行状态。
61
+ * **注册与通知:** 获取新创建 `Agent` 实例的 MCP 端点信息,并可以注册到服务发现系统中(如 Consul、Etcd、Kubernetes Service Discovery),或者直接返回给 `RootAgent`。
62
+ * **MCP 接口标准化:** 确保 `Agent Manager` 提供的 `/create_agent`、`/get_agent_info`、`/delete_agent` 等接口符合 Model Context Protocol (MCP) 规范。
63
+ * **生命周期管理:** 管理动态创建 `Agent` 的生命周期,包括启动、停止、监控其健康状态,并在不再需要时进行销毁。
64
+ * **注册与通知:** 获取新创建 `Agent` 实例的 MCP 端点信息,并可以注册到服务发现系统中(如 Consul、Etcd、Kubernetes Service Discovery),或者直接返回给 `RootAgent`。
65
+ * **资源管理:** 处理 `Agent` 的资源分配和配置。
66
+ * **实现形式:** 一个独立的应用程序,本身作为一个 MCP 服务运行。其内部逻辑是纯粹的程序性管理功能,不涉及高级智能决策。
67
+ * **部署:** 部署为另一个独立的服务(例如,一个 Docker 容器),独立于 `worker` 进程运行。
68
+
69
+ ## 3. 通信与数据流
70
+
71
+ * **外部请求 -> Worker (FastAPI):** 客户端通过 HTTP 请求与 `worker` 交互。
72
+ * **Worker (FastAPI) -> RootAgent:** `worker` 将接收到的请求转发给内部的 `RootAgent` 实例,通过直接方法调用进行通信。
73
+ * **RootAgent -> MCP Agent 服务:** `RootAgent` 通过 MCP 协议向其他 MCP `Agent` 服务发送请求,传递子任务数据。
74
+ * **MCP Agent 服务 -> RootAgent:** MCP `Agent` 服务执行任务后,通过 MCP 协议将结果返回给 `RootAgent`。
75
+ * **RootAgent -> Agent Manager:** 当需要动态创建 `Agent` 时,`RootAgent` 通过 MCP 协议向 `Agent Manager` 发送创建请求。
76
+ * **Agent Manager -> RootAgent:** `Agent Manager` 返回新创建 `Agent` 的 MCP 端点信息。
77
+ * **RootAgent 聚合结果 -> Worker (FastAPI):** `RootAgent` 聚合所有子任务的结果,并将其返回给 `worker`。
78
+ * **Worker (FastAPI) -> 外部响应:** `worker` 将最终结果作为 HTTP 响应返回给客户端。
79
+
80
+ ## 4. 部署架构概览
81
+
82
+ 1. **`worker` 服务:** 包含 FastAPI 应用和 `RootAgent` 模块,部署为一个容器。
83
+ 2. **`Agent Manager` 服务:** 独立的 MCP 服务,部署为另一个容器。
84
+ 3. **MCP `Agent` 服务:** 根据需��动态创建和部署的独立 MCP 服务,每个 `Agent` 实例通常部署为一个容器。
85
+
86
+ 这种架构提供了高度的模块化、可扩展性、资源隔离和容错能力,非常适合构建复杂且动态的 `agent` 系统。
87
+
88
+ ## 5. 部署与运维考量
89
+
90
+ * **容器编排:** 推荐使用 Kubernetes 或 Docker Swarm 等容器编排工具来管理 `worker`、`Agent Manager` 和 MCP `Agent` 服务容器。这将提供自动化部署、扩展和管理能力。
91
+ * **服务发现与负载均衡:** 利用容器编排工具内置的服务发现机制,或集成 Consul、Etcd 等独立服务发现系统。通过负载均衡器(如 Nginx、Envoy)将请求分发到多个 `worker` 实例,并确保 `RootAgent` 能够可靠地找到并调用可用的 MCP `Agent` 服务实例。
92
+ * **持久化存储:** 对于需要持久化存储的组件(如 `RootAgent` 的长期记忆、日志、配置),应考虑使用持久卷(PV/PVC)、对象存储(如 S3)或外部数据库服务。
93
+ * **健康检查与监控:** 实现对所有服务实例的健康检查,并集成监控系统(如 Prometheus、Grafana)来收集和可视化系统指标,以便及时发现和解决问题。
94
+ * **日志管理:** 采用集中式日志管理方案(如 ELK Stack 或 Loki),收集、存储和分析所有服务的日志,便于故障排查和系统审计。
95
+ * **安全性:** 实施网络隔离、访问控制、API 认证与授权(如 JWT)、数据加密等安全措施,确保系统和数据的安全。
memory-bank/activeContext.md ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 当前上下文 (Active Context)
2
+
3
+ ## 当前工作重点
4
+ * **多 Agent 系统设计文档化:** 已完成 `worker` 多 `agent` 系统架构的详细设计,并将其记录在 `docs/worker_agent_design.md` 中。该文档涵盖了 `RootAgent`、MCP `Agent` 服务和 `Agent Manager` 的职责划分、实现形式和部署方案。
5
+ * **多 Agent 系统实现计划制定:** 已制定详细的实现计划,包括 `Agent Manager` MCP 服务、具体 MCP `Agent` 服务、`RootAgent` 的调度和结果聚合逻辑,以及记忆和反思模块的实现。
6
+ * **`.clinerules` 文件学习:** 已全面学习并解释了 `.clinerules` 目录下所有规则文件的内容,理解了 Cline 的行为准则、记忆管理、任务自动化、研究流程、顺序思维和持续改进协议。
7
+
8
+ ## 最近的更改
9
+ * 在 `.gitignore` 中添加了 `__pycache__/` 忽略规则。
10
+ * 创建了 `docs/worker_agent_design.md` 文件,详细记录了多 `agent` 系统设计。
11
+ * `docs/worker_agent_design.md` 已根据详细的实现计划进行了更新。
12
+ * 根据用户反馈,正在整理 `memory-bank` 文件,以确保项目知识的持久化和可追溯性。
13
+
14
+ ## 下一步
15
+ * 等待用户切换到 ACT 模式,以开始实施多 Agent 系统设计中的各个部分。
16
+
17
+ ## 活跃决策与考量
18
+ * **记忆库的完整性:** 确保 `memory-bank` 中的所有核心文件都已创建并包含最新、最准确的信息,以便在会话重置后能够无缝恢复工作。
19
+ * **规则遵循:** 严格遵循 `.clinerules` 中定义的各项规则,尤其是在自主决策、用户交互和任务交接方面。
20
+
21
+ ## 学习与项目洞察
22
+ * 通过与用户的讨论,明确了 `worker` 作为多 `agent` 系统核心的架构需求。
23
+ * 理解了 `RootAgent` 作为任务协调者的关键作用,以及 `Agent Manager` 在动态管理 MCP `Agent` 服务中的重要性。
24
+ * 深入学习了 Cline 的自我管理和学习机制,这将有助于更高效地完成任务。
25
+ * 明确了 `Agent Manager` 作为独立 MCP 服务的重要性,以及它与 `RootAgent` 和具体 MCP `Agent` 服务之间的通信和交互模式。
memory-bank/consolidated_learnings.md ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 整合学习 (Consolidated Learnings)
2
+
3
+ ## 知识管理与记忆库
4
+ **模式:记忆库文件创建与更新流程**
5
+ - 当用户要求“生成记忆”或需要持久化项目知识时,应主动检查 `memory-bank` 目录,并根据 `1-memory-bank.md` 中定义的结构,创建或更新核心文件(`projectBrief.md`, `productContext.md`, `activeContext.md`, `progress.md`)。
6
+ - 同时,根据 `6-cline-continuous-improvement-protocol.md`,记录原始反思到 `raw_reflection_log.md`,并定期整合为 `consolidated_learnings.md`。
7
+ - `docs/worker_agent_design.md` 已根据详细的实现计划进行了更新,反映了对多 Agent 系统设计的深入理解。
8
+ - *理由:* 确保在会话重置后能够无缝恢复工作,并积累可复用的项目知识和操作经验。
9
+
10
+ ## 用户意图理解
11
+ **原则:结合 `.clinerules` 精确推断用户意图**
12
+ - 当用户指令模糊时(例如“生成记忆”),不应直接执行表面操作,而应结合 `.clinerules` 中定义的具体协议(如 `1-memory-bank.md` 和 `6-cline-continuous-improvement-protocol.md`)来推断其深层意图。
13
+ - *理由:* 减少不必要的交互,提高自主决策的准确性,确保遵循既定协议。
14
+
15
+ ## 项目特定配置
16
+ **FastAPI Worker 项目:**
17
+ - **Python 环境管理:** 使用 `conda`,默认环境名为 `airs` (Python 3.12)。
18
+ - **项目运行命令:** `conda activate airs && uvicorn codes.app:app --host 0.0.0.0 --port 7860 --reload`。
19
+ - **目录结构:** `codes` (代码), `docs` (文档), `data` (数据集), `logs` (日志) 目录不可修改。
20
+ - **多 Agent 系统架构理解:** 明确了 `Agent Manager` 作为独立 MCP 服务的重要性,以及它与 `RootAgent` 和具体 MCP `Agent` 服务之间的通信和交互模式。
21
+ - *理由:* 遵循项目规范,确保环境一致性和操作正确性。
memory-bank/productContext.md ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 产品背景 (Product Context)
2
+
3
+ ## 项目存在的意义
4
+ 本项目旨在解决现有任务处理系统在灵活性、可扩展性和智能化方面的不足。通过引入多 `agent` 系统,`worker` 能够更智能地理解、分解和执行复杂任务,从而提高自动化水平和处理效率。
5
+
6
+ ## 解决的问题
7
+ 1. **任务复杂性:** 传统单体 `worker` 难以有效处理需要多步骤、多领域知识的复杂任务。
8
+ 2. **可扩展性差:** 难以动态添加或更新新的任务处理能力。
9
+ 3. **维护成本高:** 紧耦合的系统导致修改和维护困难。
10
+ 4. **智能化不足:** 缺乏根据任务动态调度专业 `agent` 的能力。
11
+
12
+ ## 期望的工作方式
13
+ 用户提交一个高级任务请求给 `worker`。`RootAgent` 会解析这个任务,将其分解为一系列更小的、可管理的子任务。然后,`RootAgent` 会根据每个子任务的性质,动态地调度最合适的 MCP `Agent` 服务来执行。这些 MCP `Agent` 服务可以是专注于数据分析、代码生成、文档编写等不同领域的专业 `agent`。`Agent Manager` 将负责这些 MCP `Agent` 服务的生命周期管理。最终,`RootAgent` 会收集所有子任务的结果,进行整合和总结,并将最终结果返回给用户。
14
+
15
+ ## 用户体验目标
16
+ * **高效性:** 任务处理速度快,响应及时。
17
+ * **灵活性:** 能够处理各种类型和复杂度的任务。
18
+ * **透明性:** 用户可以了解任务处理的进展和状态。
19
+ * **可靠性:** 系统稳定运行,任务结果准确。
20
+ * **易用性:** 任务提交和结果获取流程简单直观。
memory-bank/progress.md ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 进度 (Progress)
2
+
3
+ ## 已完成的工作
4
+ * **FastAPI 基础服务:** 成功启动了一个 FastAPI 应用程序,但由于用户拒绝浏览器操作,未能进行可视化验证。
5
+ * **`.gitignore` 更新:** 在 `.gitignore` 中添加了 `__pycache__/` 忽略规则。
6
+ * **多 Agent 系统设计文档:** 完成了 `worker` 多 `agent` 系统架构的详细设计,并将其记录在 `docs/worker_agent_design.md` 文件中。
7
+ * **多 Agent 系统设计文档更新:** `docs/worker_agent_design.md` 已根据详细的实现计划进行了更新。
8
+ * **多 Agent 系统实现计划制定:** 已制定详细的实现计划,包括 `Agent Manager` MCP 服务、具体 MCP `Agent` 服务、`RootAgent` 的调度和结果聚合逻辑,以及记忆和反思模块的实现。
9
+ * **`.clinerules` 学习与解释:** 全面学习并解释了 `.clinerules` 目录下所有规则文件的内容,理解了 Cline 的行为准则和各项协议。
10
+ * **记忆库初始化:** 已创建 `memory-bank/projectBrief.md`、`memory-bank/productContext.md` 和 `memory-bank/activeContext.md`。
11
+
12
+ ## 待完成的工作
13
+ * **记忆库完善:** 继续创建和填充 `memory-bank` 中的其他核心文件(如果需要)和补充文件(`systemPatterns.md`、`techContext.md`),以及 `raw_reflection_log.md` 和 `consolidated_learnings.md`。
14
+ * **多 Agent 系统实现:** 根据详细计划,逐步实现 `Agent Manager`、具体 MCP `Agent` 服务、`RootAgent` 的调度和结果聚合逻辑,以及记忆和反思模块。
15
+ * **功能测试与验证:** 对实现的功能进行全面的测试,确保系统按预期工作。
16
+
17
+ ## 当前状态
18
+ * 项目的基础架构已搭建,核心设计已文档化。
19
+ * 已制定详细的实现计划,并准备进入实施阶段。
20
+ * Cline 已全面理解项目背景和自身操作规则。
21
+ * 记忆库正在初始化和完善中,以确保知识的持久化。
22
+
23
+ ## 已知问题
24
+ * FastAPI 应用程序尚未进行可视化验证。
25
+ * 多 `agent` 系统的具体实现尚未开始。
26
+
27
+ ## 项目决策演变
28
+ * 最初的任务是运行 FastAPI 应用,但随着讨论深入,演变为设计和文档化一个复杂的多 `agent` 系统。
29
+ * 用户强调了对 `.clinerules` 的学习和记忆库整理的重要性,这促使我优先处理知识管理任务。
memory-bank/projectBrief.md ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 项目简报 (Project Brief)
2
+
3
+ ## 核心需求与目标
4
+ 本项目旨在开发一个基于 FastAPI 的 `worker` 服务,该服务将作为多 `agent` 系统的一部分。`worker` 的核心目标是:
5
+ 1. 接收并处理任务。
6
+ 2. 将复杂任务拆分为子任务。
7
+ 3. 调度不同的 MCP `Agent` 服务来执行子任务。
8
+ 4. 聚合子任务结果并返回最终结果。
9
+ 5. 实现高度模块化、可扩展和可维护的 `agent` 架构。
10
+
11
+ ## 项目范围
12
+ * **阶段一:** 搭建 FastAPI `worker` 基础框架,实现简单的 API 路由。
13
+ * **阶段二:** 设计并文档化多 `agent` 系统架构,包括 `RootAgent`、MCP `Agent` 服务和 `Agent Manager` 的职责与交互。
14
+ * **阶段三:** 实现 `RootAgent` 的核心逻辑,包括任务解析、子任务生成和 `Agent` 调度。
15
+ * **阶段四:** 开发 `Agent Manager` MCP 服务,实现 `Agent` 的动态创建、管理和销毁。
16
+ * **阶段五:** 集成具体的 MCP `Agent` 服务,完成端到端任务处理流程。
17
+
18
+ ## 关键里程碑
19
+ * FastAPI 基础服务搭建完成。
20
+ * 多 `agent` 系统设计文档完成。
21
+ * `RootAgent` 核心调度逻辑实现。
22
+ * `Agent Manager` 服务实现。
23
+ * 端到端任务处理流程验证。
24
+
25
+ ## 利益相关者
26
+ * 用户:需要一个高效、灵活的任务处理 `worker`。
27
+ * 开发团队:需要一个清晰、模块化的架构以支持未来的扩展和维护。
memory-bank/raw_reflection_log.md ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 原始反思日志 (Raw Reflection Log)
2
+
3
+ ---
4
+ Date: 2025-09-16
5
+ TaskRef: "整理记忆库文件并解释 .clinerules 规则"
6
+
7
+ Learnings:
8
+ - 成功理解并解释了 `.clinerules` 目录下所有规则文件的内容,包括基础指令、记忆库结构、新任务自动化、研究流程、顺序思维、自我改进和持续改进协议。
9
+ - 明确了 `memory-bank` 文件夹的结构和每个核心文件的作用。
10
+ - 掌握了在 `ACT MODE` 下根据用户反馈主动创建和填充记忆库文件的流程。
11
+
12
+ Difficulties:
13
+ - 最初在用户要求“生成记忆”时,我错误地使用了 `attempt_completion`,没有意识到用户是希望我按照 `memory-bank` 的规则来整理记忆。这表明对用户意图的理解需要更精确地结合 `.clinerules` 中的具体指导。
14
+ - `memory-bank` 目录最初不存在,需要先创建文件才能自动创建目录。
15
+
16
+ Successes:
17
+ - 成功创建了 `memory-bank` 目录及其下的 `projectBrief.md`, `productContext.md`, `activeContext.md`, `progress.md` 文件。
18
+ - 详细解释了所有 `.clinerules` 文件,为后续任务奠定了坚实的基础。
19
+ - 遵循了中文交流的规则。
20
+
21
+ Improvements_Identified_For_Consolidation:
22
+ - 强化对用户模糊指令的解析能力,结合 `.clinerules` 规则进行更准确的意图推断。
23
+ - 明确 `memory-bank` 文件的创建和更新流程,作为核心知识进行固化。
24
+ ---
requirements.txt CHANGED
@@ -1,2 +1,4 @@
1
  fastapi
2
  uvicorn[standard]
 
 
 
1
  fastapi
2
  uvicorn[standard]
3
+ qwen-agent
4
+ python-dotenv