Spaces:
Runtime error
Runtime error
File size: 5,080 Bytes
893dedc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
import docker
from abc import ABC, abstractmethod
from typing import Dict, Any
from datetime import datetime
import uuid
from .models import AgentInfo, CreateAgentRequest
class AgentDeployer(ABC):
"""
抽象基类,定义 Agent 部署器的接口。
"""
@abstractmethod
def deploy_agent(self, request: CreateAgentRequest) -> AgentInfo:
"""
部署一个新的 Agent 实例。
"""
pass
@abstractmethod
def destroy_agent(self, agent_id: str) -> bool:
"""
销毁一个 Agent 实例。
"""
pass
class DockerAgentDeployer(AgentDeployer):
"""
基于 Docker 的 Agent 部署器实现。
"""
def __init__(self):
self.client = docker.from_env()
def deploy_agent(self, request: CreateAgentRequest) -> AgentInfo:
agent_id = str(uuid.uuid4())
container_name = f"agent-{request.agent_type}-{agent_id[:8]}"
# 准备环境变量
environment = {
"AGENT_ID": agent_id,
"AGENT_TYPE": request.agent_type,
**request.env_vars
}
# 启动 Docker 容器
try:
container = self.client.containers.run(
request.image_name,
name=container_name,
detach=True,
environment=environment,
ports={'8000/tcp': None}, # 假设 Agent MCP Server 运行在容器的 8000 端口
# resource_limits 可以在这里配置,但 docker-py 的 run 方法直接支持的参数有限
# 更复杂的资源限制可能需要通过 create_container 和 start 组合
)
# 获取容器的 IP 地址和端口
container.reload()
# Docker 容器的 IP 地址通常在 bridge 网络中,需要进一步获取
# 简化处理,假设 Agent Manager 和 Agent 在同一网络,或者通过服务发现获取
# 这里暂时使用一个占位符,实际部署中需要更复杂的网络配置或服务发现
# 例如,如果 Agent Manager 运行在 Docker 网络中,可以通过容器名解析
# 或者通过 Redis 注册时 Agent 自身上报其可访问的 IP:Port
# 暂时使用一个占位符,实际需要从容器网络配置中获取
# 或者等待 Agent 启动后自行注册到 Redis
mcp_endpoint = f"http://{container_name}:8000"
now = datetime.now().isoformat()
agent_info = AgentInfo(
id=agent_id,
agent_type=request.agent_type,
mcp_endpoint=mcp_endpoint,
status="running",
created_at=now,
last_heartbeat=now,
metadata={"container_id": container.id, "container_name": container_name}
)
return agent_info
except docker.errors.ImageNotFound:
raise ValueError(f"Docker image '{request.image_name}' not found.")
except docker.errors.APIError as e:
raise RuntimeError(f"Failed to deploy agent container: {e}")
def destroy_agent(self, agent_id: str) -> bool:
try:
# 根据 agent_id 查找容器
# 假设 agent_id 存储在 AgentInfo 的 metadata 中作为 container_id
# 或者 Agent Manager 内部维护 agent_id 到 container_id 的映射
# 这里简化处理,假设 agent_id 就是 container_id 或者可以通过某种方式直接找到容器
# 实际中,Agent Manager 应该维护 AgentInfo 列表,通过 AgentInfo.metadata['container_id'] 来查找
# 为了演示,我们尝试通过名称查找,但更健壮的方式是存储 container_id
# 假设 agent_id 实际上是 AgentInfo.metadata['container_id']
# 这里需要一个机制来从 agent_id 映射到 container_id 或 container_name
# 暂时通过遍历所有容器来查找,实际不推荐
container_to_destroy = None
for container in self.client.containers.list(all=True):
if f"agent-{agent_id[:8]}" in container.name: # 粗略匹配
container_to_destroy = container
break
if container_to_destroy:
container_to_destroy.stop()
container_to_destroy.remove()
return True
else:
# 如果 agent_id 是 AgentInfo.id,我们需要从 Agent Manager 的存储中获取 container_id
# 这里只是一个占位符,实际需要 Agent Manager 的状态管理
print(f"Container for agent_id {agent_id} not found.")
return False
except docker.errors.NotFound:
print(f"Container for agent_id {agent_id} not found.")
return False
except docker.errors.APIError as e:
raise RuntimeError(f"Failed to destroy agent container {agent_id}: {e}")
|