import docker from abc import ABC, abstractmethod from typing import Dict, Any from datetime import datetime import uuid from .models import AgentInfo, CreateAgentRequest class AgentDeployer(ABC): """ 抽象基类,定义 Agent 部署器的接口。 """ @abstractmethod def deploy_agent(self, request: CreateAgentRequest) -> AgentInfo: """ 部署一个新的 Agent 实例。 """ pass @abstractmethod def destroy_agent(self, agent_id: str) -> bool: """ 销毁一个 Agent 实例。 """ pass class DockerAgentDeployer(AgentDeployer): """ 基于 Docker 的 Agent 部署器实现。 """ def __init__(self): self.client = docker.from_env() def deploy_agent(self, request: CreateAgentRequest) -> AgentInfo: agent_id = str(uuid.uuid4()) container_name = f"agent-{request.agent_type}-{agent_id[:8]}" # 准备环境变量 environment = { "AGENT_ID": agent_id, "AGENT_TYPE": request.agent_type, **request.env_vars } # 启动 Docker 容器 try: container = self.client.containers.run( request.image_name, name=container_name, detach=True, environment=environment, ports={'8000/tcp': None}, # 假设 Agent MCP Server 运行在容器的 8000 端口 # resource_limits 可以在这里配置,但 docker-py 的 run 方法直接支持的参数有限 # 更复杂的资源限制可能需要通过 create_container 和 start 组合 ) # 获取容器的 IP 地址和端口 container.reload() # Docker 容器的 IP 地址通常在 bridge 网络中,需要进一步获取 # 简化处理,假设 Agent Manager 和 Agent 在同一网络,或者通过服务发现获取 # 这里暂时使用一个占位符,实际部署中需要更复杂的网络配置或服务发现 # 例如,如果 Agent Manager 运行在 Docker 网络中,可以通过容器名解析 # 或者通过 Redis 注册时 Agent 自身上报其可访问的 IP:Port # 暂时使用一个占位符,实际需要从容器网络配置中获取 # 或者等待 Agent 启动后自行注册到 Redis mcp_endpoint = f"http://{container_name}:8000" now = datetime.now().isoformat() agent_info = AgentInfo( id=agent_id, agent_type=request.agent_type, mcp_endpoint=mcp_endpoint, status="running", created_at=now, last_heartbeat=now, metadata={"container_id": container.id, "container_name": container_name} ) return agent_info except docker.errors.ImageNotFound: raise ValueError(f"Docker image '{request.image_name}' not found.") except docker.errors.APIError as e: raise RuntimeError(f"Failed to deploy agent container: {e}") def destroy_agent(self, agent_id: str) -> bool: try: # 根据 agent_id 查找容器 # 假设 agent_id 存储在 AgentInfo 的 metadata 中作为 container_id # 或者 Agent Manager 内部维护 agent_id 到 container_id 的映射 # 这里简化处理,假设 agent_id 就是 container_id 或者可以通过某种方式直接找到容器 # 实际中,Agent Manager 应该维护 AgentInfo 列表,通过 AgentInfo.metadata['container_id'] 来查找 # 为了演示,我们尝试通过名称查找,但更健壮的方式是存储 container_id # 假设 agent_id 实际上是 AgentInfo.metadata['container_id'] # 这里需要一个机制来从 agent_id 映射到 container_id 或 container_name # 暂时通过遍历所有容器来查找,实际不推荐 container_to_destroy = None for container in self.client.containers.list(all=True): if f"agent-{agent_id[:8]}" in container.name: # 粗略匹配 container_to_destroy = container break if container_to_destroy: container_to_destroy.stop() container_to_destroy.remove() return True else: # 如果 agent_id 是 AgentInfo.id,我们需要从 Agent Manager 的存储中获取 container_id # 这里只是一个占位符,实际需要 Agent Manager 的状态管理 print(f"Container for agent_id {agent_id} not found.") return False except docker.errors.NotFound: print(f"Container for agent_id {agent_id} not found.") return False except docker.errors.APIError as e: raise RuntimeError(f"Failed to destroy agent container {agent_id}: {e}")