Spaces:
Sleeping
Sleeping
File size: 5,096 Bytes
d14cb3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
from typing import Dict, Any
from datetime import datetime
import asyncio
import json
import os
import redis
import uvicorn
from fastapi import FastAPI, BackgroundTasks
from .mcp_agent_interface import MCPAgent
# Redis 配置
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
AGENT_CHANNEL = "agent_discovery_channel"
AGENT_KEY_PREFIX = "agent:"
HEARTBEAT_INTERVAL = 5 # seconds
class EchoAgent(MCPAgent):
def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]):
super().__init__(agent_id, agent_type, config)
self.redis_client: Optional[redis.Redis] = None
self.heartbeat_task: Optional[asyncio.Task] = None
self.mcp_endpoint = os.getenv("MCP_ENDPOINT", f"http://localhost:8000") # Agent 自身的 MCP 服务地址
async def initialize(self):
"""
Agent 启动时的初始化逻辑,连接 Redis 并注册。
"""
self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
try:
self.redis_client.ping()
print(f"Agent {self.id} connected to Redis successfully!")
except redis.exceptions.ConnectionError as e:
print(f"Agent {self.id} could not connect to Redis: {e}")
# 可以在这里选择退出或以降级模式运行
await self._register_agent()
self.heartbeat_task = asyncio.create_task(self._send_heartbeat())
print(f"EchoAgent {self.id} initialized and registered.")
async def shutdown(self):
"""
Agent 关闭时的清理逻辑,发送下线消息并停止心跳。
"""
if self.heartbeat_task:
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
await self._unregister_agent()
if self.redis_client:
self.redis_client.close()
print(f"EchoAgent {self.id} shutdown.")
async def _register_agent(self):
"""
向 Redis 注册 Agent 信息。
"""
agent_info = {
"id": self.id,
"agent_type": self.agent_type,
"mcp_endpoint": self.mcp_endpoint,
"status": "running",
"created_at": datetime.now().isoformat(),
"last_heartbeat": datetime.now().isoformat(),
"metadata": json.dumps(self.config) # 将配置作为元数据存储
}
self.redis_client.hmset(f"{AGENT_KEY_PREFIX}{self.id}", agent_info)
self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "ONLINE"}))
print(f"Agent {self.id} registered to Redis.")
async def _unregister_agent(self):
"""
向 Redis 发送下线消息。
"""
self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "OFFLINE"}))
# Agent Manager 会负责从 Redis 中删除键
print(f"Agent {self.id} sent OFFLINE message to Redis.")
async def _send_heartbeat(self):
"""
定期向 Redis 发送心跳。
"""
while True:
await asyncio.sleep(HEARTBEAT_INTERVAL)
try:
self.redis_client.hset(f"{AGENT_KEY_PREFIX}{self.id}", "last_heartbeat", datetime.now().isoformat())
self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "HEARTBEAT"}))
# print(f"Agent {self.id} sent HEARTBEAT.")
except Exception as e:
print(f"Error sending heartbeat for Agent {self.id}: {e}")
async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]:
"""
EchoAgent 的核心逻辑:接收消息并返回。
"""
print(f"EchoAgent {self.id} received subtask: {subtask_data}")
message = subtask_data.get("message", "No message provided.")
return {"status": "completed", "output": f"Echo: {message}", "agent_id": self.id}
# FastAPI 应用实例
app = FastAPI()
echo_agent: Optional[EchoAgent] = None
@app.on_event("startup")
async def startup_event():
global echo_agent
agent_id = os.getenv("AGENT_ID", str(uuid.uuid4()))
agent_type = os.getenv("AGENT_TYPE", "EchoAgent")
config = json.loads(os.getenv("AGENT_CONFIG", "{}"))
echo_agent = EchoAgent(agent_id=agent_id, agent_type=agent_type, config=config)
await echo_agent.initialize()
@app.on_event("shutdown")
async def shutdown_event():
if echo_agent:
await echo_agent.shutdown()
@app.post("/process_subtask")
async def process_subtask_endpoint(subtask_data: Dict[str, Any]):
if not echo_agent:
raise HTTPException(status_code=500, detail="Agent not initialized.")
return await echo_agent.process_subtask(subtask_data)
@app.get("/health")
async def health_check():
return {"status": "ok", "agent_id": echo_agent.id if echo_agent else "N/A"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
|