airsltd commited on
Commit
893dedc
·
1 Parent(s): a2a8029
Files changed (4) hide show
  1. app.py +203 -3
  2. deployer.py +117 -0
  3. models.py +33 -0
  4. requirements.txt +2 -0
app.py CHANGED
@@ -1,7 +1,207 @@
1
- from fastapi import FastAPI
 
 
 
 
 
 
 
 
 
 
2
 
3
  app = FastAPI()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
 
5
  @app.get("/")
6
- def greet_json():
7
- return {"Hello": "World!"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException, status, BackgroundTasks, Depends
2
+ from typing import Dict, List, Optional
3
+ from datetime import datetime
4
+ import asyncio
5
+ import json
6
+ import uuid
7
+ import os
8
+ import redis
9
+
10
+ from .models import AgentInfo, CreateAgentRequest, AgentUpdateRequest
11
+ from .deployer import AgentDeployer, DockerAgentDeployer
12
 
13
  app = FastAPI()
14
+ deployer: AgentDeployer = DockerAgentDeployer()
15
+
16
+ # Redis 配置
17
+ REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
18
+ REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
19
+ REDIS_DB = int(os.getenv("REDIS_DB", 0))
20
+
21
+ redis_client: Optional[redis.Redis] = None
22
+ redis_pubsub: Optional[redis.client.PubSub] = None
23
+ AGENT_CHANNEL = "agent_discovery_channel"
24
+ AGENT_KEY_PREFIX = "agent:"
25
+
26
+ # 存储活跃 Agent 的信息 (内存缓存,最终以 Redis 为准)
27
+ active_agents: Dict[str, AgentInfo] = {}
28
+
29
+ async def _redis_listener():
30
+ """
31
+ 监听 Redis 频道,处理 Agent 的上线、心跳和下线消息。
32
+ """
33
+ if not redis_pubsub:
34
+ return
35
+
36
+ while True:
37
+ message = redis_pubsub.get_message(ignore_subscribe_messages=True)
38
+ if message:
39
+ try:
40
+ data = json.loads(message['data'].decode('utf-8'))
41
+ agent_id = data.get("id")
42
+ event_type = data.get("event_type")
43
+
44
+ if agent_id:
45
+ if event_type == "HEARTBEAT" or event_type == "ONLINE":
46
+ # 从 Redis 获取最新的 Agent 信息
47
+ agent_data = redis_client.hgetall(f"{AGENT_KEY_PREFIX}{agent_id}")
48
+ if agent_data:
49
+ agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()})
50
+ active_agents[agent_id] = agent_info
51
+ print(f"Agent {agent_id} {event_type} received. Status: {agent_info.status}")
52
+ else:
53
+ print(f"Agent {agent_id} {event_type} received, but no data in Redis.")
54
+ elif event_type == "OFFLINE":
55
+ if agent_id in active_agents:
56
+ del active_agents[agent_id]
57
+ redis_client.delete(f"{AGENT_KEY_PREFIX}{agent_id}")
58
+ print(f"Agent {agent_id} OFFLINE received and removed.")
59
+ except json.JSONDecodeError:
60
+ print(f"Received invalid JSON message: {message['data']}")
61
+ except Exception as e:
62
+ print(f"Error processing Redis message: {e}")
63
+ await asyncio.sleep(0.1) # 避免忙等
64
+
65
+ @app.on_event("startup")
66
+ async def startup_event():
67
+ global redis_client, redis_pubsub
68
+ redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
69
+ try:
70
+ redis_client.ping()
71
+ print("Connected to Redis successfully!")
72
+ except redis.exceptions.ConnectionError as e:
73
+ print(f"Could not connect to Redis: {e}")
74
+ # 可以在这里选择退出或以降级模式运行
75
+
76
+ redis_pubsub = redis_client.pubsub()
77
+ redis_pubsub.subscribe(AGENT_CHANNEL)
78
+ asyncio.create_task(_redis_listener())
79
+
80
+ # 从 Redis 加载所有已知的 Agent
81
+ for key in redis_client.keys(f"{AGENT_KEY_PREFIX}*"):
82
+ agent_data = redis_client.hgetall(key)
83
+ if agent_data:
84
+ agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()})
85
+ active_agents[agent_info.id] = agent_info
86
+ print(f"Loaded existing agent: {agent_info.id} ({agent_info.agent_type})")
87
+
88
+
89
+ @app.on_event("shutdown")
90
+ async def shutdown_event():
91
+ if redis_pubsub:
92
+ redis_pubsub.unsubscribe(AGENT_CHANNEL)
93
+ if redis_client:
94
+ redis_client.close()
95
+ print("Redis connection closed.")
96
 
97
  @app.get("/")
98
+ async def read_root():
99
+ return {"message": "Agent Manager is running!"}
100
+
101
+ @app.post("/agents", response_model=AgentInfo, status_code=status.HTTP_201_CREATED)
102
+ async def create_agent(request: CreateAgentRequest):
103
+ """
104
+ 部署一个新的 Agent 实例。
105
+ """
106
+ try:
107
+ agent_info = deployer.deploy_agent(request)
108
+
109
+ # 将 Agent 信息存储到 Redis
110
+ redis_client.hmset(f"{AGENT_KEY_PREFIX}{agent_info.id}", agent_info.dict())
111
+ active_agents[agent_info.id] = agent_info # 更新内存缓存
112
+
113
+ print(f"Agent {agent_info.id} ({agent_info.agent_type}) deployed and registered.")
114
+ return agent_info
115
+ except ValueError as e:
116
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
117
+ except RuntimeError as e:
118
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
119
+
120
+ @app.get("/agents/{agent_id}", response_model=AgentInfo)
121
+ async def get_agent_info(agent_id: str):
122
+ """
123
+ 获取指定 Agent 实例的详细信息。
124
+ """
125
+ agent_info = active_agents.get(agent_id)
126
+ if not agent_info:
127
+ # 尝试从 Redis 加载,以防内存缓存丢失
128
+ agent_data = redis_client.hgetall(f"{AGENT_KEY_PREFIX}{agent_id}")
129
+ if agent_data:
130
+ agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()})
131
+ active_agents[agent_id] = agent_info
132
+ else:
133
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.")
134
+ return agent_info
135
+
136
+ @app.get("/agents", response_model=List[AgentInfo])
137
+ async def list_agents(agent_type: Optional[str] = None):
138
+ """
139
+ 列出所有活跃的 Agent 实例,可按 agent_type 过滤。
140
+ """
141
+ if agent_type:
142
+ return [agent for agent in active_agents.values() if agent.agent_type == agent_type]
143
+ return list(active_agents.values())
144
+
145
+ @app.put("/agents/{agent_id}", response_model=AgentInfo)
146
+ async def update_agent_status(agent_id: str, update_request: AgentUpdateRequest):
147
+ """
148
+ 更新 Agent 实例的状态或信息。
149
+ 主要用于 Agent 自身上报心跳或状态变更。
150
+ """
151
+ agent_info = active_agents.get(agent_id)
152
+ if not agent_info:
153
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.")
154
+
155
+ update_data = update_request.dict(exclude_unset=True)
156
+ for key, value in update_data.items():
157
+ setattr(agent_info, key, value)
158
+
159
+ agent_info.last_heartbeat = datetime.now().isoformat() # 自动更新心跳时间
160
+
161
+ # 更新 Redis
162
+ redis_client.hmset(f"{AGENT_KEY_PREFIX}{agent_id}", agent_info.dict())
163
+ return agent_info
164
+
165
+ @app.delete("/agents/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
166
+ async def destroy_agent(agent_id: str):
167
+ """
168
+ 停止并彻底销毁指定 Agent 实例及其底层资源。
169
+ """
170
+ agent_info = active_agents.get(agent_id)
171
+ if not agent_info:
172
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.")
173
+
174
+ try:
175
+ if deployer.destroy_agent(agent_info.id): # deployer 销毁时使用 AgentInfo.id
176
+ if agent_id in active_agents:
177
+ del active_agents[agent_id]
178
+ redis_client.delete(f"{AGENT_KEY_PREFIX}{agent_id}")
179
+ print(f"Agent {agent_id} destroyed and removed from registry.")
180
+ return {"message": f"Agent {agent_id} destroyed successfully."}
181
+ else:
182
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to destroy agent {agent_id}.")
183
+ except RuntimeError as e:
184
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
185
+
186
+ @app.post("/agents/{agent_id}/stop", status_code=status.HTTP_200_OK)
187
+ async def stop_agent(agent_id: str):
188
+ """
189
+ 优雅地停止指定 Agent 实例。
190
+ """
191
+ agent_info = active_agents.get(agent_id)
192
+ if not agent_info:
193
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.")
194
+
195
+ try:
196
+ # 对于 DockerAgentDeployer,停止和销毁可能行为类似,这里假设停止是销毁的一部分
197
+ # 实际中,停止可能只是暂停容器,不删除
198
+ if deployer.destroy_agent(agent_info.id): # 暂时用 destroy_agent 模拟停止
199
+ agent_info.status = "stopped"
200
+ agent_info.last_heartbeat = datetime.now().isoformat()
201
+ redis_client.hmset(f"{AGENT_KEY_PREFIX}{agent_id}", agent_info.dict())
202
+ print(f"Agent {agent_id} stopped.")
203
+ return {"message": f"Agent {agent_id} stopped successfully."}
204
+ else:
205
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to stop agent {agent_id}.")
206
+ except RuntimeError as e:
207
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
deployer.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import docker
2
+ from abc import ABC, abstractmethod
3
+ from typing import Dict, Any
4
+ from datetime import datetime
5
+ import uuid
6
+
7
+ from .models import AgentInfo, CreateAgentRequest
8
+
9
+ class AgentDeployer(ABC):
10
+ """
11
+ 抽象基类,定义 Agent 部署器的接口。
12
+ """
13
+ @abstractmethod
14
+ def deploy_agent(self, request: CreateAgentRequest) -> AgentInfo:
15
+ """
16
+ 部署一个新的 Agent 实例。
17
+ """
18
+ pass
19
+
20
+ @abstractmethod
21
+ def destroy_agent(self, agent_id: str) -> bool:
22
+ """
23
+ 销毁一个 Agent 实例。
24
+ """
25
+ pass
26
+
27
+ class DockerAgentDeployer(AgentDeployer):
28
+ """
29
+ 基于 Docker 的 Agent 部署器实现。
30
+ """
31
+ def __init__(self):
32
+ self.client = docker.from_env()
33
+
34
+ def deploy_agent(self, request: CreateAgentRequest) -> AgentInfo:
35
+ agent_id = str(uuid.uuid4())
36
+ container_name = f"agent-{request.agent_type}-{agent_id[:8]}"
37
+
38
+ # 准备环境变量
39
+ environment = {
40
+ "AGENT_ID": agent_id,
41
+ "AGENT_TYPE": request.agent_type,
42
+ **request.env_vars
43
+ }
44
+
45
+ # 启动 Docker 容器
46
+ try:
47
+ container = self.client.containers.run(
48
+ request.image_name,
49
+ name=container_name,
50
+ detach=True,
51
+ environment=environment,
52
+ ports={'8000/tcp': None}, # 假设 Agent MCP Server 运行在容器的 8000 端口
53
+ # resource_limits 可以在这里配置,但 docker-py 的 run 方法直接支持的参数有限
54
+ # 更复杂的资源限制可能需要通过 create_container 和 start 组合
55
+ )
56
+ # 获取容器的 IP 地址和端口
57
+ container.reload()
58
+ # Docker 容器的 IP 地址通常在 bridge 网络中,需要进一步获取
59
+ # 简化处理,假设 Agent Manager 和 Agent 在同一网络,或者通过服务发现获取
60
+ # 这里暂时使用一个占位符,实际部署中需要更复杂的网络配置或服务发现
61
+ # 例如,如果 Agent Manager 运行在 Docker 网络中,可以通过容器名解析
62
+ # 或者通过 Redis 注册时 Agent 自身上报其可访问的 IP:Port
63
+
64
+ # 暂时使用一个占位符,实际需要从容器网络配置中获取
65
+ # 或者等待 Agent 启动后自行注册到 Redis
66
+ mcp_endpoint = f"http://{container_name}:8000"
67
+
68
+ now = datetime.now().isoformat()
69
+ agent_info = AgentInfo(
70
+ id=agent_id,
71
+ agent_type=request.agent_type,
72
+ mcp_endpoint=mcp_endpoint,
73
+ status="running",
74
+ created_at=now,
75
+ last_heartbeat=now,
76
+ metadata={"container_id": container.id, "container_name": container_name}
77
+ )
78
+ return agent_info
79
+ except docker.errors.ImageNotFound:
80
+ raise ValueError(f"Docker image '{request.image_name}' not found.")
81
+ except docker.errors.APIError as e:
82
+ raise RuntimeError(f"Failed to deploy agent container: {e}")
83
+
84
+ def destroy_agent(self, agent_id: str) -> bool:
85
+ try:
86
+ # 根据 agent_id 查找容器
87
+ # 假设 agent_id 存储在 AgentInfo 的 metadata 中作为 container_id
88
+ # 或者 Agent Manager 内部维护 agent_id 到 container_id 的映射
89
+ # 这里简化处理,假设 agent_id 就是 container_id 或者可以通过某种方式直接找到容器
90
+
91
+ # 实际中,Agent Manager 应该维护 AgentInfo 列表,通过 AgentInfo.metadata['container_id'] 来查找
92
+ # 为了演示,我们尝试通过名称查找,但更健壮的方式是存储 container_id
93
+
94
+ # 假设 agent_id 实际上是 AgentInfo.metadata['container_id']
95
+ # 这里需要一个机制来从 agent_id 映射到 container_id 或 container_name
96
+ # 暂时通过遍历所有容器来查找,实际不推荐
97
+
98
+ container_to_destroy = None
99
+ for container in self.client.containers.list(all=True):
100
+ if f"agent-{agent_id[:8]}" in container.name: # 粗略匹配
101
+ container_to_destroy = container
102
+ break
103
+
104
+ if container_to_destroy:
105
+ container_to_destroy.stop()
106
+ container_to_destroy.remove()
107
+ return True
108
+ else:
109
+ # 如果 agent_id 是 AgentInfo.id,我们需要从 Agent Manager 的存储中获取 container_id
110
+ # 这里只是一个占位符,实际需要 Agent Manager 的状态管理
111
+ print(f"Container for agent_id {agent_id} not found.")
112
+ return False
113
+ except docker.errors.NotFound:
114
+ print(f"Container for agent_id {agent_id} not found.")
115
+ return False
116
+ except docker.errors.APIError as e:
117
+ raise RuntimeError(f"Failed to destroy agent container {agent_id}: {e}")
models.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Optional
2
+ from pydantic import BaseModel
3
+
4
+ class AgentInfo(BaseModel):
5
+ """
6
+ 表示一个 Agent 实例的信息。
7
+ """
8
+ id: str
9
+ agent_type: str
10
+ mcp_endpoint: str # MCP 服务的访问地址 (e.g., "http://localhost:8001")
11
+ status: str = "running" # Agent 的状态 (e.g., "running", "stopped", "error")
12
+ created_at: str
13
+ last_heartbeat: str
14
+ metadata: Dict = {} # 存储其他元数据,如资源使用情况、版本等
15
+
16
+ class CreateAgentRequest(BaseModel):
17
+ """
18
+ 创建 Agent 实例的请求模型。
19
+ """
20
+ agent_type: str
21
+ image_name: str # Docker 镜像名称 (e.g., "my-echo-agent:latest")
22
+ env_vars: Dict[str, str] = {} # 环境变量字典
23
+ resource_limits: Dict = {} # 资源限制 (e.g., {"cpu": "0.5", "memory": "512m"})
24
+ config: Dict = {} # Agent 自身的配置信息
25
+
26
+ class AgentUpdateRequest(BaseModel):
27
+ """
28
+ 更新 Agent 实例信息的请求模型。
29
+ """
30
+ status: Optional[str] = None
31
+ mcp_endpoint: Optional[str] = None
32
+ last_heartbeat: Optional[str] = None
33
+ metadata: Optional[Dict] = None
requirements.txt CHANGED
@@ -1,2 +1,4 @@
1
  fastapi
2
  uvicorn[standard]
 
 
 
1
  fastapi
2
  uvicorn[standard]
3
+ docker
4
+ redis