Spaces:
Sleeping
Sleeping
| from typing import Dict, Any | |
| from datetime import datetime | |
| import asyncio | |
| import json | |
| import os | |
| import redis | |
| import uvicorn | |
| from fastapi import FastAPI, BackgroundTasks | |
| from .mcp_agent_interface import MCPAgent | |
| # Redis 配置 | |
| REDIS_HOST = os.getenv("REDIS_HOST", "localhost") | |
| REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) | |
| REDIS_DB = int(os.getenv("REDIS_DB", 0)) | |
| AGENT_CHANNEL = "agent_discovery_channel" | |
| AGENT_KEY_PREFIX = "agent:" | |
| HEARTBEAT_INTERVAL = 5 # seconds | |
| class EchoAgent(MCPAgent): | |
| def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]): | |
| super().__init__(agent_id, agent_type, config) | |
| self.redis_client: Optional[redis.Redis] = None | |
| self.heartbeat_task: Optional[asyncio.Task] = None | |
| self.mcp_endpoint = os.getenv("MCP_ENDPOINT", f"http://localhost:8000") # Agent 自身的 MCP 服务地址 | |
| async def initialize(self): | |
| """ | |
| Agent 启动时的初始化逻辑,连接 Redis 并注册。 | |
| """ | |
| self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) | |
| try: | |
| self.redis_client.ping() | |
| print(f"Agent {self.id} connected to Redis successfully!") | |
| except redis.exceptions.ConnectionError as e: | |
| print(f"Agent {self.id} could not connect to Redis: {e}") | |
| # 可以在这里选择退出或以降级模式运行 | |
| await self._register_agent() | |
| self.heartbeat_task = asyncio.create_task(self._send_heartbeat()) | |
| print(f"EchoAgent {self.id} initialized and registered.") | |
| async def shutdown(self): | |
| """ | |
| Agent 关闭时的清理逻辑,发送下线消息并停止心跳。 | |
| """ | |
| if self.heartbeat_task: | |
| self.heartbeat_task.cancel() | |
| try: | |
| await self.heartbeat_task | |
| except asyncio.CancelledError: | |
| pass | |
| await self._unregister_agent() | |
| if self.redis_client: | |
| self.redis_client.close() | |
| print(f"EchoAgent {self.id} shutdown.") | |
| async def _register_agent(self): | |
| """ | |
| 向 Redis 注册 Agent 信息。 | |
| """ | |
| agent_info = { | |
| "id": self.id, | |
| "agent_type": self.agent_type, | |
| "mcp_endpoint": self.mcp_endpoint, | |
| "status": "running", | |
| "created_at": datetime.now().isoformat(), | |
| "last_heartbeat": datetime.now().isoformat(), | |
| "metadata": json.dumps(self.config) # 将配置作为元数据存储 | |
| } | |
| self.redis_client.hmset(f"{AGENT_KEY_PREFIX}{self.id}", agent_info) | |
| self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "ONLINE"})) | |
| print(f"Agent {self.id} registered to Redis.") | |
| async def _unregister_agent(self): | |
| """ | |
| 向 Redis 发送下线消息。 | |
| """ | |
| self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "OFFLINE"})) | |
| # Agent Manager 会负责从 Redis 中删除键 | |
| print(f"Agent {self.id} sent OFFLINE message to Redis.") | |
| async def _send_heartbeat(self): | |
| """ | |
| 定期向 Redis 发送心跳。 | |
| """ | |
| while True: | |
| await asyncio.sleep(HEARTBEAT_INTERVAL) | |
| try: | |
| self.redis_client.hset(f"{AGENT_KEY_PREFIX}{self.id}", "last_heartbeat", datetime.now().isoformat()) | |
| self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "HEARTBEAT"})) | |
| # print(f"Agent {self.id} sent HEARTBEAT.") | |
| except Exception as e: | |
| print(f"Error sending heartbeat for Agent {self.id}: {e}") | |
| async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| EchoAgent 的核心逻辑:接收消息并返回。 | |
| """ | |
| print(f"EchoAgent {self.id} received subtask: {subtask_data}") | |
| message = subtask_data.get("message", "No message provided.") | |
| return {"status": "completed", "output": f"Echo: {message}", "agent_id": self.id} | |
| # FastAPI 应用实例 | |
| app = FastAPI() | |
| echo_agent: Optional[EchoAgent] = None | |
| async def startup_event(): | |
| global echo_agent | |
| agent_id = os.getenv("AGENT_ID", str(uuid.uuid4())) | |
| agent_type = os.getenv("AGENT_TYPE", "EchoAgent") | |
| config = json.loads(os.getenv("AGENT_CONFIG", "{}")) | |
| echo_agent = EchoAgent(agent_id=agent_id, agent_type=agent_type, config=config) | |
| await echo_agent.initialize() | |
| async def shutdown_event(): | |
| if echo_agent: | |
| await echo_agent.shutdown() | |
| async def process_subtask_endpoint(subtask_data: Dict[str, Any]): | |
| if not echo_agent: | |
| raise HTTPException(status_code=500, detail="Agent not initialized.") | |
| return await echo_agent.process_subtask(subtask_data) | |
| async def health_check(): | |
| return {"status": "ok", "agent_id": echo_agent.id if echo_agent else "N/A"} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |