File size: 5,096 Bytes
d14cb3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from typing import Dict, Any
from datetime import datetime
import asyncio
import json
import os
import redis
import uvicorn
from fastapi import FastAPI, BackgroundTasks

from .mcp_agent_interface import MCPAgent

# Redis 配置
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
AGENT_CHANNEL = "agent_discovery_channel"
AGENT_KEY_PREFIX = "agent:"
HEARTBEAT_INTERVAL = 5 # seconds

class EchoAgent(MCPAgent):
    def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]):
        super().__init__(agent_id, agent_type, config)
        self.redis_client: Optional[redis.Redis] = None
        self.heartbeat_task: Optional[asyncio.Task] = None
        self.mcp_endpoint = os.getenv("MCP_ENDPOINT", f"http://localhost:8000") # Agent 自身的 MCP 服务地址

    async def initialize(self):
        """
        Agent 启动时的初始化逻辑,连接 Redis 并注册。
        """
        self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
        try:
            self.redis_client.ping()
            print(f"Agent {self.id} connected to Redis successfully!")
        except redis.exceptions.ConnectionError as e:
            print(f"Agent {self.id} could not connect to Redis: {e}")
            # 可以在这里选择退出或以降级模式运行

        await self._register_agent()
        self.heartbeat_task = asyncio.create_task(self._send_heartbeat())
        print(f"EchoAgent {self.id} initialized and registered.")

    async def shutdown(self):
        """
        Agent 关闭时的清理逻辑,发送下线消息并停止心跳。
        """
        if self.heartbeat_task:
            self.heartbeat_task.cancel()
            try:
                await self.heartbeat_task
            except asyncio.CancelledError:
                pass
        await self._unregister_agent()
        if self.redis_client:
            self.redis_client.close()
        print(f"EchoAgent {self.id} shutdown.")

    async def _register_agent(self):
        """
        向 Redis 注册 Agent 信息。
        """
        agent_info = {
            "id": self.id,
            "agent_type": self.agent_type,
            "mcp_endpoint": self.mcp_endpoint,
            "status": "running",
            "created_at": datetime.now().isoformat(),
            "last_heartbeat": datetime.now().isoformat(),
            "metadata": json.dumps(self.config) # 将配置作为元数据存储
        }
        self.redis_client.hmset(f"{AGENT_KEY_PREFIX}{self.id}", agent_info)
        self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "ONLINE"}))
        print(f"Agent {self.id} registered to Redis.")

    async def _unregister_agent(self):
        """
        向 Redis 发送下线消息。
        """
        self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "OFFLINE"}))
        # Agent Manager 会负责从 Redis 中删除键
        print(f"Agent {self.id} sent OFFLINE message to Redis.")

    async def _send_heartbeat(self):
        """
        定期向 Redis 发送心跳。
        """
        while True:
            await asyncio.sleep(HEARTBEAT_INTERVAL)
            try:
                self.redis_client.hset(f"{AGENT_KEY_PREFIX}{self.id}", "last_heartbeat", datetime.now().isoformat())
                self.redis_client.publish(AGENT_CHANNEL, json.dumps({"id": self.id, "event_type": "HEARTBEAT"}))
                # print(f"Agent {self.id} sent HEARTBEAT.")
            except Exception as e:
                print(f"Error sending heartbeat for Agent {self.id}: {e}")

    async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        EchoAgent 的核心逻辑:接收消息并返回。
        """
        print(f"EchoAgent {self.id} received subtask: {subtask_data}")
        message = subtask_data.get("message", "No message provided.")
        return {"status": "completed", "output": f"Echo: {message}", "agent_id": self.id}

# FastAPI 应用实例
app = FastAPI()
echo_agent: Optional[EchoAgent] = None

@app.on_event("startup")
async def startup_event():
    global echo_agent
    agent_id = os.getenv("AGENT_ID", str(uuid.uuid4()))
    agent_type = os.getenv("AGENT_TYPE", "EchoAgent")
    config = json.loads(os.getenv("AGENT_CONFIG", "{}"))
    
    echo_agent = EchoAgent(agent_id=agent_id, agent_type=agent_type, config=config)
    await echo_agent.initialize()

@app.on_event("shutdown")
async def shutdown_event():
    if echo_agent:
        await echo_agent.shutdown()

@app.post("/process_subtask")
async def process_subtask_endpoint(subtask_data: Dict[str, Any]):
    if not echo_agent:
        raise HTTPException(status_code=500, detail="Agent not initialized.")
    return await echo_agent.process_subtask(subtask_data)

@app.get("/health")
async def health_check():
    return {"status": "ok", "agent_id": echo_agent.id if echo_agent else "N/A"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)