from typing import Dict, Any from datetime import datetime import asyncio import json import os import redis import uvicorn from fastapi import FastAPI, BackgroundTasks from .mcp_agent_interface import MCPAgent # Redis 配置 REDIS_HOST = os.getenv("REDIS_HOST", "localhost") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) REDIS_DB = int(os.getenv("REDIS_DB", 0)) AGENT_CHANNEL = "agent_discovery_channel" AGENT_KEY_PREFIX = "agent:" HEARTBEAT_INTERVAL = 5 # seconds class EchoAgent(MCPAgent): def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]): super().__init__(agent_id, agent_type, config) self.redis_client: Optional[redis.Redis] = None self.heartbeat_task: Optional[asyncio.Task] = None self.mcp_endpoint = os.getenv("MCP_ENDPOINT", f"http://localhost:8000") # Agent 自身的 MCP 服务地址 async def initialize(self): """ Agent 启动时的初始化逻辑,连接 Redis 并注册。 """ self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) try: self.redis_client.ping() print(f"Agent {self.id} connected to Redis successfully!") except redis.exceptions.ConnectionError as e: print(f"Agent {self.id} could not connect to Redis: {e}") # 可以在这里选择退出或以降级模式运行 await self._register_agent() self.heartbeat_task = asyncio.create_task(self._send_heartbeat()) print(f"EchoAgent {self.id} initialized and registered.") async def shutdown(self): """ Agent 关闭时的清理逻辑,发送下线消息并停止心跳。 """ if self.heartbeat_task: self.heartbeat_task.cancel() try: await self.heartbeat_task except asyncio.CancelledError: pass await self._unregister_agent() if self.redis_client: self.redis_client.close() print(f"EchoAgent {self.id} shutdown.") async def _register_agent(self): """ 向 Redis 注册 Agent 信息。 """ agent_info = { "id": self.id, "agent_type": self.agent_type, "mcp_endpoint": self.mcp_endpoint, "status": "running", "created_at": datetime.now().isoformat(), "last_heartbeat": datetime.now().isoformat(), "metadata": json.dumps(self.config) # 将配置作为元数据存储 } self.redis_client.hmset(f"{AGENT_KEY_PREFIX}{self.id}", agent_info) self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "ONLINE"})) print(f"Agent {self.id} registered to Redis.") async def _unregister_agent(self): """ 向 Redis 发送下线消息。 """ self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "OFFLINE"})) # Agent Manager 会负责从 Redis 中删除键 print(f"Agent {self.id} sent OFFLINE message to Redis.") async def _send_heartbeat(self): """ 定期向 Redis 发送心跳。 """ while True: await asyncio.sleep(HEARTBEAT_INTERVAL) try: self.redis_client.hset(f"{AGENT_KEY_PREFIX}{self.id}", "last_heartbeat", datetime.now().isoformat()) self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "HEARTBEAT"})) # print(f"Agent {self.id} sent HEARTBEAT.") except Exception as e: print(f"Error sending heartbeat for Agent {self.id}: {e}") async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]: """ EchoAgent 的核心逻辑:接收消息并返回。 """ print(f"EchoAgent {self.id} received subtask: {subtask_data}") message = subtask_data.get("message", "No message provided.") return {"status": "completed", "output": f"Echo: {message}", "agent_id": self.id} # FastAPI 应用实例 app = FastAPI() echo_agent: Optional[EchoAgent] = None @app.on_event("startup") async def startup_event(): global echo_agent agent_id = os.getenv("AGENT_ID", str(uuid.uuid4())) agent_type = os.getenv("AGENT_TYPE", "EchoAgent") config = json.loads(os.getenv("AGENT_CONFIG", "{}")) echo_agent = EchoAgent(agent_id=agent_id, agent_type=agent_type, config=config) await echo_agent.initialize() @app.on_event("shutdown") async def shutdown_event(): if echo_agent: await echo_agent.shutdown() @app.post("/process_subtask") async def process_subtask_endpoint(subtask_data: Dict[str, Any]): if not echo_agent: raise HTTPException(status_code=500, detail="Agent not initialized.") return await echo_agent.process_subtask(subtask_data) @app.get("/health") async def health_check(): return {"status": "ok", "agent_id": echo_agent.id if echo_agent else "N/A"} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)