Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, HTTPException, status, BackgroundTasks, Depends | |
| from typing import Dict, List, Optional | |
| from datetime import datetime | |
| import asyncio | |
| import json | |
| import uuid | |
| import os | |
| import redis | |
| from .models import AgentInfo, CreateAgentRequest, AgentUpdateRequest | |
| from .deployer import AgentDeployer, DockerAgentDeployer | |
| app = FastAPI() | |
| deployer: AgentDeployer = DockerAgentDeployer() | |
| # Redis 配置 | |
| REDIS_HOST = os.getenv("REDIS_HOST", "localhost") | |
| REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) | |
| REDIS_DB = int(os.getenv("REDIS_DB", 0)) | |
| redis_client: Optional[redis.Redis] = None | |
| redis_pubsub: Optional[redis.client.PubSub] = None | |
| AGENT_CHANNEL = "agent_discovery_channel" | |
| AGENT_KEY_PREFIX = "agent:" | |
| # 存储活跃 Agent 的信息 (内存缓存,最终以 Redis 为准) | |
| active_agents: Dict[str, AgentInfo] = {} | |
| async def _redis_listener(): | |
| """ | |
| 监听 Redis 频道,处理 Agent 的上线、心跳和下线消息。 | |
| """ | |
| if not redis_pubsub: | |
| return | |
| while True: | |
| message = redis_pubsub.get_message(ignore_subscribe_messages=True) | |
| if message: | |
| try: | |
| data = json.loads(message['data'].decode('utf-8')) | |
| agent_id = data.get("id") | |
| event_type = data.get("event_type") | |
| if agent_id: | |
| if event_type == "HEARTBEAT" or event_type == "ONLINE": | |
| # 从 Redis 获取最新的 Agent 信息 | |
| agent_data = redis_client.hgetall(f"{AGENT_KEY_PREFIX}{agent_id}") | |
| if agent_data: | |
| agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()}) | |
| active_agents[agent_id] = agent_info | |
| print(f"Agent {agent_id} {event_type} received. Status: {agent_info.status}") | |
| else: | |
| print(f"Agent {agent_id} {event_type} received, but no data in Redis.") | |
| elif event_type == "OFFLINE": | |
| if agent_id in active_agents: | |
| del active_agents[agent_id] | |
| redis_client.delete(f"{AGENT_KEY_PREFIX}{agent_id}") | |
| print(f"Agent {agent_id} OFFLINE received and removed.") | |
| except json.JSONDecodeError: | |
| print(f"Received invalid JSON message: {message['data']}") | |
| except Exception as e: | |
| print(f"Error processing Redis message: {e}") | |
| await asyncio.sleep(0.1) # 避免忙等 | |
| async def startup_event(): | |
| global redis_client, redis_pubsub | |
| redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) | |
| try: | |
| redis_client.ping() | |
| print("Connected to Redis successfully!") | |
| except redis.exceptions.ConnectionError as e: | |
| print(f"Could not connect to Redis: {e}") | |
| # 可以在这里选择退出或以降级模式运行 | |
| redis_pubsub = redis_client.pubsub() | |
| redis_pubsub.subscribe(AGENT_CHANNEL) | |
| asyncio.create_task(_redis_listener()) | |
| # 从 Redis 加载所有已知的 Agent | |
| for key in redis_client.keys(f"{AGENT_KEY_PREFIX}*"): | |
| agent_data = redis_client.hgetall(key) | |
| if agent_data: | |
| agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()}) | |
| active_agents[agent_info.id] = agent_info | |
| print(f"Loaded existing agent: {agent_info.id} ({agent_info.agent_type})") | |
| async def shutdown_event(): | |
| if redis_pubsub: | |
| redis_pubsub.unsubscribe(AGENT_CHANNEL) | |
| if redis_client: | |
| redis_client.close() | |
| print("Redis connection closed.") | |
| async def read_root(): | |
| return {"message": "Agent Manager is running!"} | |
| async def create_agent(request: CreateAgentRequest): | |
| """ | |
| 部署一个新的 Agent 实例。 | |
| """ | |
| try: | |
| agent_info = deployer.deploy_agent(request) | |
| # 将 Agent 信息存储到 Redis | |
| redis_client.hmset(f"{AGENT_KEY_PREFIX}{agent_info.id}", agent_info.dict()) | |
| active_agents[agent_info.id] = agent_info # 更新内存缓存 | |
| print(f"Agent {agent_info.id} ({agent_info.agent_type}) deployed and registered.") | |
| return agent_info | |
| except ValueError as e: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) | |
| except RuntimeError as e: | |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) | |
| async def get_agent_info(agent_id: str): | |
| """ | |
| 获取指定 Agent 实例的详细信息。 | |
| """ | |
| agent_info = active_agents.get(agent_id) | |
| if not agent_info: | |
| # 尝试从 Redis 加载,以防内存缓存丢失 | |
| agent_data = redis_client.hgetall(f"{AGENT_KEY_PREFIX}{agent_id}") | |
| if agent_data: | |
| agent_info = AgentInfo(**{k.decode('utf-8'): v.decode('utf-8') for k, v in agent_data.items()}) | |
| active_agents[agent_id] = agent_info | |
| else: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.") | |
| return agent_info | |
| async def list_agents(agent_type: Optional[str] = None): | |
| """ | |
| 列出所有活跃的 Agent 实例,可按 agent_type 过滤。 | |
| """ | |
| if agent_type: | |
| return [agent for agent in active_agents.values() if agent.agent_type == agent_type] | |
| return list(active_agents.values()) | |
| async def update_agent_status(agent_id: str, update_request: AgentUpdateRequest): | |
| """ | |
| 更新 Agent 实例的状态或信息。 | |
| 主要用于 Agent 自身上报心跳或状态变更。 | |
| """ | |
| agent_info = active_agents.get(agent_id) | |
| if not agent_info: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.") | |
| update_data = update_request.dict(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| setattr(agent_info, key, value) | |
| agent_info.last_heartbeat = datetime.now().isoformat() # 自动更新心跳时间 | |
| # 更新 Redis | |
| redis_client.hmset(f"{AGENT_KEY_PREFIX}{agent_id}", agent_info.dict()) | |
| return agent_info | |
| async def destroy_agent(agent_id: str): | |
| """ | |
| 停止并彻底销毁指定 Agent 实例及其底层资源。 | |
| """ | |
| agent_info = active_agents.get(agent_id) | |
| if not agent_info: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.") | |
| try: | |
| if deployer.destroy_agent(agent_info.id): # deployer 销毁时使用 AgentInfo.id | |
| if agent_id in active_agents: | |
| del active_agents[agent_id] | |
| redis_client.delete(f"{AGENT_KEY_PREFIX}{agent_id}") | |
| print(f"Agent {agent_id} destroyed and removed from registry.") | |
| return {"message": f"Agent {agent_id} destroyed successfully."} | |
| else: | |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to destroy agent {agent_id}.") | |
| except RuntimeError as e: | |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) | |
| async def stop_agent(agent_id: str): | |
| """ | |
| 优雅地停止指定 Agent 实例。 | |
| """ | |
| agent_info = active_agents.get(agent_id) | |
| if not agent_info: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent with ID {agent_id} not found.") | |
| try: | |
| # 对于 DockerAgentDeployer,停止和销毁可能行为类似,这里假设停止是销毁的一部分 | |
| # 实际中,停止可能只是暂停容器,不删除 | |
| if deployer.destroy_agent(agent_info.id): # 暂时用 destroy_agent 模拟停止 | |
| agent_info.status = "stopped" | |
| agent_info.last_heartbeat = datetime.now().isoformat() | |
| redis_client.hmset(f"{AGENT_KEY_PREFIX}{agent_id}", agent_info.dict()) | |
| print(f"Agent {agent_id} stopped.") | |
| return {"message": f"Agent {agent_id} stopped successfully."} | |
| else: | |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to stop agent {agent_id}.") | |
| except RuntimeError as e: | |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) | |