root_agent / agents /echo_agent.py
airsltd's picture
update
d14cb3f
from typing import Dict, Any
from datetime import datetime
import asyncio
import json
import os
import redis
import uvicorn
from fastapi import FastAPI, BackgroundTasks
from .mcp_agent_interface import MCPAgent
# Redis 配置
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
AGENT_CHANNEL = "agent_discovery_channel"
AGENT_KEY_PREFIX = "agent:"
HEARTBEAT_INTERVAL = 5 # seconds
class EchoAgent(MCPAgent):
def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]):
super().__init__(agent_id, agent_type, config)
self.redis_client: Optional[redis.Redis] = None
self.heartbeat_task: Optional[asyncio.Task] = None
self.mcp_endpoint = os.getenv("MCP_ENDPOINT", f"http://localhost:8000") # Agent 自身的 MCP 服务地址
async def initialize(self):
"""
Agent 启动时的初始化逻辑,连接 Redis 并注册。
"""
self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
try:
self.redis_client.ping()
print(f"Agent {self.id} connected to Redis successfully!")
except redis.exceptions.ConnectionError as e:
print(f"Agent {self.id} could not connect to Redis: {e}")
# 可以在这里选择退出或以降级模式运行
await self._register_agent()
self.heartbeat_task = asyncio.create_task(self._send_heartbeat())
print(f"EchoAgent {self.id} initialized and registered.")
async def shutdown(self):
"""
Agent 关闭时的清理逻辑,发送下线消息并停止心跳。
"""
if self.heartbeat_task:
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
await self._unregister_agent()
if self.redis_client:
self.redis_client.close()
print(f"EchoAgent {self.id} shutdown.")
async def _register_agent(self):
"""
向 Redis 注册 Agent 信息。
"""
agent_info = {
"id": self.id,
"agent_type": self.agent_type,
"mcp_endpoint": self.mcp_endpoint,
"status": "running",
"created_at": datetime.now().isoformat(),
"last_heartbeat": datetime.now().isoformat(),
"metadata": json.dumps(self.config) # 将配置作为元数据存储
}
self.redis_client.hmset(f"{AGENT_KEY_PREFIX}{self.id}", agent_info)
self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "ONLINE"}))
print(f"Agent {self.id} registered to Redis.")
async def _unregister_agent(self):
"""
向 Redis 发送下线消息。
"""
self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "OFFLINE"}))
# Agent Manager 会负责从 Redis 中删除键
print(f"Agent {self.id} sent OFFLINE message to Redis.")
async def _send_heartbeat(self):
"""
定期向 Redis 发送心跳。
"""
while True:
await asyncio.sleep(HEARTBEAT_INTERVAL)
try:
self.redis_client.hset(f"{AGENT_KEY_PREFIX}{self.id}", "last_heartbeat", datetime.now().isoformat())
self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "HEARTBEAT"}))
# print(f"Agent {self.id} sent HEARTBEAT.")
except Exception as e:
print(f"Error sending heartbeat for Agent {self.id}: {e}")
async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]:
"""
EchoAgent 的核心逻辑:接收消息并返回。
"""
print(f"EchoAgent {self.id} received subtask: {subtask_data}")
message = subtask_data.get("message", "No message provided.")
return {"status": "completed", "output": f"Echo: {message}", "agent_id": self.id}
# FastAPI 应用实例
app = FastAPI()
echo_agent: Optional[EchoAgent] = None
@app.on_event("startup")
async def startup_event():
global echo_agent
agent_id = os.getenv("AGENT_ID", str(uuid.uuid4()))
agent_type = os.getenv("AGENT_TYPE", "EchoAgent")
config = json.loads(os.getenv("AGENT_CONFIG", "{}"))
echo_agent = EchoAgent(agent_id=agent_id, agent_type=agent_type, config=config)
await echo_agent.initialize()
@app.on_event("shutdown")
async def shutdown_event():
if echo_agent:
await echo_agent.shutdown()
@app.post("/process_subtask")
async def process_subtask_endpoint(subtask_data: Dict[str, Any]):
if not echo_agent:
raise HTTPException(status_code=500, detail="Agent not initialized.")
return await echo_agent.process_subtask(subtask_data)
@app.get("/health")
async def health_check():
return {"status": "ok", "agent_id": echo_agent.id if echo_agent else "N/A"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)