airsltd commited on
Commit
8abdadf
·
1 Parent(s): 1eca6b5
Files changed (4) hide show
  1. agents/root_agent.py +14 -79
  2. app.py +69 -8
  3. requirements.txt +1 -0
  4. upstash_redis_test.py +109 -0
agents/root_agent.py CHANGED
@@ -3,108 +3,43 @@ import asyncio
3
  import os
4
  import json
5
  import httpx
6
- import redis
7
 
8
  from .agent_manager_client import AgentManagerClient, AgentInfo, CreateAgentRequest
9
  from .mcp_agent_interface import MCPAgent # 导入 MCP Agent 接口
10
 
11
- # Redis 配置
12
- REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
13
- REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
14
- REDIS_DB = int(os.getenv("REDIS_DB", 0))
15
- AGENT_CHANNEL = "agent_discovery_channel"
16
- AGENT_KEY_PREFIX = "agent:"
17
-
18
  class RootAgent:
19
  def __init__(self):
20
  AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
21
  self.agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
22
  self.active_agents: Dict[str, AgentInfo] = {} # 内存缓存活跃 Agent
23
- self.redis_client: Optional[redis.Redis] = None
24
- self.redis_pubsub: Optional[redis.client.PubSub] = None
25
- self.discovery_task: Optional[asyncio.Task] = None
26
 
27
  async def initialize(self):
28
  """
29
- RootAgent 启动时的初始化逻辑,连接 Redis 并启动 Agent 发现。
30
  """
31
- self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
32
- try:
33
- self.redis_client.ping()
34
- print("RootAgent connected to Redis successfully!")
35
- except redis.exceptions.ConnectionError as e:
36
- print(f"RootAgent could not connect to Redis: {e}")
37
- # 可以在这里选择退出或以降级模式运行
38
-
39
- self.redis_pubsub = self.redis_client.pubsub()
40
- self.redis_pubsub.subscribe(AGENT_CHANNEL)
41
- self.discovery_task = asyncio.create_task(self._agent_discovery_listener())
42
-
43
- # 从 Redis 加载所有已知的 Agent
44
- await self._load_existing_agents()
45
  print("RootAgent initialized.")
46
 
47
  async def shutdown(self):
48
  """
49
  RootAgent 关闭时的清理逻辑。
50
  """
51
- if self.discovery_task:
52
- self.discovery_task.cancel()
53
- try:
54
- await self.discovery_task
55
- except asyncio.CancelledError:
56
- pass
57
- if self.redis_pubsub:
58
- self.redis_pubsub.unsubscribe(AGENT_CHANNEL)
59
- if self.redis_client:
60
- self.redis_client.close()
61
  print("RootAgent shutdown.")
62
 
63
- async def _agent_discovery_listener(self):
64
  """
65
- 监听 Redis 频道,实时更新活跃 Agent 列表。
66
  """
67
- if not self.redis_pubsub:
68
- return
69
-
70
- while True:
71
- message = self.redis_pubsub.get_message(ignore_subscribe_messages=True)
72
- if message:
73
- try:
74
- data = json.loads(message['data'].decode('utf-8'))
75
- agent_id = data.get("id")
76
- event_type = data.get("event_type")
77
-
78
- if agent_id:
79
- if event_type == "HEARTBEAT" or event_type == "ONLINE":
80
- # 从 Redis 获取最新的 Agent 信息
81
- agent_data = self.redis_client.hgetall(f"{AGENT_KEY_PREFIX}{agent_id}")
82
- if agent_data:
83
- agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()})
84
- self.active_agents[agent_id] = agent_info
85
- # print(f"RootAgent discovered/updated agent: {agent_id} ({agent_info.agent_type})")
86
- else:
87
- print(f"RootAgent received {event_type} for {agent_id}, but no data in Redis.")
88
- elif event_type == "OFFLINE":
89
- if agent_id in self.active_agents:
90
- del self.active_agents[agent_id]
91
- print(f"RootAgent removed offline agent: {agent_id}")
92
- except json.JSONDecodeError:
93
- print(f"RootAgent received invalid JSON message from Redis: {message['data']}")
94
- except Exception as e:
95
- print(f"RootAgent error processing Redis message: {e}")
96
- await asyncio.sleep(0.1)
97
 
98
- async def _load_existing_agents(self):
99
- """
100
- 从 Redis 加载所有已知的 Agent 到内存缓存。
101
- """
102
- for key in self.redis_client.keys(f"{AGENT_KEY_PREFIX}*"):
103
- agent_data = self.redis_client.hgetall(key)
104
- if agent_data:
105
- agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()})
106
- self.active_agents[agent_info.id] = agent_info
107
- print(f"RootAgent loaded existing agent: {agent_info.id} ({agent_info.agent_type})")
108
 
109
  async def _get_or_create_agent(self, agent_type: str, image_name: str = "echo-agent:latest") -> AgentInfo:
110
  """
@@ -121,7 +56,7 @@ class RootAgent:
121
  create_request = CreateAgentRequest(
122
  agent_type=agent_type,
123
  image_name=image_name,
124
- env_vars={"REDIS_HOST": REDIS_HOST, "REDIS_PORT": str(REDIS_PORT), "REDIS_DB": str(REDIS_DB)},
125
  config={"some_initial_config": "value"}
126
  )
127
  try:
 
3
  import os
4
  import json
5
  import httpx
 
6
 
7
  from .agent_manager_client import AgentManagerClient, AgentInfo, CreateAgentRequest
8
  from .mcp_agent_interface import MCPAgent # 导入 MCP Agent 接口
9
 
 
 
 
 
 
 
 
10
  class RootAgent:
11
  def __init__(self):
12
  AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
13
  self.agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
14
  self.active_agents: Dict[str, AgentInfo] = {} # 内存缓存活跃 Agent
 
 
 
15
 
16
  async def initialize(self):
17
  """
18
+ RootAgent 启动时的初始化逻辑,通过 Agent Manager 发现 Agent
19
  """
20
+ await self._refresh_active_agents()
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  print("RootAgent initialized.")
22
 
23
  async def shutdown(self):
24
  """
25
  RootAgent 关闭时的清理逻辑。
26
  """
 
 
 
 
 
 
 
 
 
 
27
  print("RootAgent shutdown.")
28
 
29
+ async def _refresh_active_agents(self):
30
  """
31
+ Agent Manager 加载所有活跃的 Agent 到内存缓存。
32
  """
33
+ try:
34
+ agents = await self.agent_manager_client.list_agents()
35
+ self.active_agents = {agent.id: agent for agent in agents}
36
+ print(f"RootAgent refreshed active agents. Found {len(self.active_agents)} agents.")
37
+ except httpx.HTTPStatusError as e:
38
+ print(f"Failed to refresh agents from Agent Manager: {e.response.text}")
39
+ # 可以在这里选择抛出异常或以降级模式运行
40
+ except Exception as e:
41
+ print(f"An unexpected error occurred while refreshing agents: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
 
 
 
 
 
 
 
 
 
 
 
43
 
44
  async def _get_or_create_agent(self, agent_type: str, image_name: str = "echo-agent:latest") -> AgentInfo:
45
  """
 
56
  create_request = CreateAgentRequest(
57
  agent_type=agent_type,
58
  image_name=image_name,
59
+ env_vars={}, # 移除 Redis 相关的环境变量,因为 RootAgent 不再直接使用 Redis
60
  config={"some_initial_config": "value"}
61
  )
62
  try:
app.py CHANGED
@@ -1,30 +1,89 @@
1
  from fastapi import FastAPI, HTTPException, status
2
- from typing import Dict, Any, Optional
3
  import asyncio
4
  import os
 
 
 
 
5
 
6
  from agents.root_agent import RootAgent
7
 
 
 
 
 
 
 
 
 
 
 
8
  app = FastAPI()
9
  root_agent_instance: Optional[RootAgent] = None
10
 
11
- @app.on_event("startup")
12
- async def startup_event():
13
- global root_agent_instance
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  root_agent_instance = RootAgent()
15
- await root_agent_instance.initialize()
 
 
 
 
 
 
16
 
17
- @app.on_event("shutdown")
18
- async def shutdown_event():
 
19
  if root_agent_instance:
20
  await root_agent_instance.shutdown()
 
 
 
21
 
22
  @app.get("/")
23
  async def read_root():
24
  return {"message": "Worker service with RootAgent is running!"}
25
 
26
  @app.post("/process_task")
27
- async def process_task_endpoint(task_data: Dict[str, Any]):
28
  """
29
  接收外部任务请求,并交由 RootAgent 处理。
30
  """
@@ -32,6 +91,8 @@ async def process_task_endpoint(task_data: Dict[str, Any]):
32
  raise HTTPException(status_code=500, detail="RootAgent not initialized.")
33
 
34
  try:
 
 
35
  result = await root_agent_instance.process_task(task_data)
36
  return result
37
  except Exception as e:
 
1
  from fastapi import FastAPI, HTTPException, status
2
+ from typing import Dict, Any, Optional, List
3
  import asyncio
4
  import os
5
+ import requests
6
+ from dotenv import load_dotenv
7
+ import contextlib
8
+ from pydantic import BaseModel
9
 
10
  from agents.root_agent import RootAgent
11
 
12
+ # 定义任务子项的模型
13
+ class Subtask(BaseModel):
14
+ agent_type: str
15
+ payload: Dict[str, Any]
16
+
17
+ # 定义 process_task endpoint 的请求体模型
18
+ class ProcessTaskRequest(BaseModel):
19
+ subtasks: List[Subtask]
20
+ global_config: Optional[Dict[str, Any]] = None # 可选的全局配置
21
+
22
  app = FastAPI()
23
  root_agent_instance: Optional[RootAgent] = None
24
 
25
+ # Upstash Redis 相关的全局变量
26
+ UPSTASH_REDIS_REST_URL: Optional[str] = None
27
+ UPSTASH_REDIS_REST_TOKEN: Optional[str] = None
28
+
29
+ @contextlib.asynccontextmanager
30
+ async def lifespan(app: FastAPI):
31
+ global root_agent_instance, UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN
32
+
33
+ # 加载 .env 文件中的环境变量
34
+ load_dotenv()
35
+ UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
36
+ UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
37
+
38
+ if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
39
+ print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。")
40
+ # 可以在这里选择抛出异常或继续,取决于应用程序对 Redis 的依赖程度
41
+ # 为了演示,我们选择继续,但会在 Redis 操作时检查这些变量
42
+ else:
43
+ print("Upstash Redis 环境变量已加载。")
44
+ # 可以在这里添加一个简单的 Redis 连接测试
45
+ headers = {
46
+ "Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}",
47
+ "Content-Type": "application/json"
48
+ }
49
+ try:
50
+ # 尝试执行一个简单的 PING 命令来测试连接
51
+ ping_url = f"{UPSTASH_REDIS_REST_URL}/ping"
52
+ response = requests.post(ping_url, headers=headers)
53
+ response.raise_for_status()
54
+ ping_result = response.json()
55
+ if ping_result and ping_result.get('result') == "PONG":
56
+ print("Upstash Redis 连接测试成功!")
57
+ else:
58
+ print(f"Upstash Redis 连接测试失败: {ping_result}")
59
+ except Exception as e:
60
+ print(f"Upstash Redis 连接测试发生错误: {e}")
61
+
62
+ # 初始化 RootAgent
63
  root_agent_instance = RootAgent()
64
+ try:
65
+ await root_agent_instance.initialize()
66
+ print("RootAgent 已初始化。")
67
+ except Exception as e:
68
+ print(f"RootAgent 初始化失败: {e}")
69
+ # 可以在这里选择抛出异常或以降级模式运行
70
+ raise # 重新抛出异常以阻止应用程序启动,直到问题解决
71
 
72
+ yield # 应用程序在此处启动
73
+
74
+ # 应用程序关闭时执行的清理工作
75
  if root_agent_instance:
76
  await root_agent_instance.shutdown()
77
+ print("RootAgent 已关闭。")
78
+
79
+ app = FastAPI(lifespan=lifespan)
80
 
81
  @app.get("/")
82
  async def read_root():
83
  return {"message": "Worker service with RootAgent is running!"}
84
 
85
  @app.post("/process_task")
86
+ async def process_task_endpoint(request: ProcessTaskRequest):
87
  """
88
  接收外部任务请求,并交由 RootAgent 处理。
89
  """
 
91
  raise HTTPException(status_code=500, detail="RootAgent not initialized.")
92
 
93
  try:
94
+ # 将 Pydantic 模型转换为 RootAgent 期望的 Dict 格式
95
+ task_data = request.dict()
96
  result = await root_agent_instance.process_task(task_data)
97
  return result
98
  except Exception as e:
requirements.txt CHANGED
@@ -3,3 +3,4 @@ uvicorn[standard]
3
  qwen-agent
4
  python-dotenv
5
  python-dateutil
 
 
3
  qwen-agent
4
  python-dotenv
5
  python-dateutil
6
+ requests
upstash_redis_test.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import requests
3
+ from dotenv import load_dotenv
4
+
5
+ # 加载 .env 文件中的环境变量
6
+ load_dotenv()
7
+
8
+ UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
9
+ UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
10
+
11
+ if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
12
+ print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。")
13
+ exit(1)
14
+
15
+ headers = {
16
+ "Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}",
17
+ "Content-Type": "application/json"
18
+ }
19
+
20
+ def execute_redis_command(command, *args):
21
+ url = ""
22
+ send_json_payload = False
23
+ payload = []
24
+
25
+ if command.upper() == "SET":
26
+ if len(args) != 2:
27
+ print("SET 命令需要两个参数:key 和 value。")
28
+ return None
29
+ key, value = args
30
+ url = f"{UPSTASH_REDIS_REST_URL}/set/{key}/{value}"
31
+ elif command.upper() == "GET":
32
+ if len(args) != 1:
33
+ print("GET 命令需要一个参数:key。")
34
+ return None
35
+ key = args[0]
36
+ url = f"{UPSTASH_REDIS_REST_URL}/get/{key}"
37
+ elif command.upper() == "DEL":
38
+ if len(args) != 1:
39
+ print("DEL 命令需要一个参数:key。")
40
+ return None
41
+ key = args[0]
42
+ url = f"{UPSTASH_REDIS_REST_URL}/del/{key}"
43
+ else:
44
+ # 对于其他命令,使用通用格式,并发送 JSON 数组作为 payload
45
+ url = f"{UPSTASH_REDIS_REST_URL}/{command}"
46
+ payload = [arg for arg in args]
47
+ send_json_payload = True
48
+
49
+ try:
50
+ if send_json_payload:
51
+ response = requests.post(url, headers=headers, json=payload)
52
+ else:
53
+ response = requests.post(url, headers=headers) # 不发送 json=payload
54
+ response.raise_for_status() # 如果请求失败,抛出 HTTPError
55
+ return response.json()
56
+ except requests.exceptions.HTTPError as http_err:
57
+ print(f"HTTP 错误发生: {http_err}")
58
+ print(f"响应内容: {response.text}")
59
+ except requests.exceptions.ConnectionError as conn_err:
60
+ print(f"连接错误发生: {conn_err}")
61
+ except requests.exceptions.Timeout as timeout_err:
62
+ print(f"请求超时: {timeout_err}")
63
+ except requests.exceptions.RequestException as req_err:
64
+ print(f"发生未知错误: {req_err}")
65
+ return None
66
+
67
+ def test_redis_connection():
68
+ print("正在测试 Upstash Redis 连接...")
69
+
70
+ # SET 命令
71
+ key = "mykey"
72
+ value = "hello_upstash"
73
+ print(f"正在设置键 '{key}' 为值 '{value}'...")
74
+ set_result = execute_redis_command("SET", key, value)
75
+ if set_result:
76
+ print(f"SET 结果: {set_result}")
77
+ if set_result and set_result.get('result') == "OK":
78
+ print("SET 命令成功。")
79
+ else:
80
+ print("SET 命令失败。")
81
+ return # 如果 SET 失败,才返回
82
+
83
+ # GET 命令
84
+ print(f"正在获取键 '{key}' 的值...")
85
+ get_result = execute_redis_command("GET", key)
86
+ if get_result is not None:
87
+ print(f"GET 结果: {get_result}")
88
+ if get_result and get_result.get('result') == value:
89
+ print("GET 命令成功,值匹配。")
90
+ else:
91
+ print("GET 命令失败,值不匹配或结果格式不正确。")
92
+ else:
93
+ print("GET 命令失败。")
94
+ return
95
+
96
+ # DEL 命令 (可选,用于清理)
97
+ print(f"正在删除键 '{key}'...")
98
+ del_result = execute_redis_command("DEL", key)
99
+ if del_result is not None:
100
+ print(f"DEL 结果: {del_result}")
101
+ if del_result and del_result.get('result') == 1:
102
+ print("DEL 命令成功。")
103
+ else:
104
+ print("DEL 命令失败或结果格式不正确。")
105
+ else:
106
+ print("DEL 命令失败。")
107
+
108
+ if __name__ == "__main__":
109
+ test_redis_connection()