Spaces:
Sleeping
Sleeping
File size: 1,108 Bytes
c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c d14cb3f c202f7c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
from abc import ABC, abstractmethod
from typing import Dict, Any
class MCPAgent(ABC):
"""
抽象基类,定义 MCP Agent 的通用接口。
所有具体的 Agent 都应继承此接口并实现其方法。
"""
def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]):
self.id = agent_id
self.agent_type = agent_type
self.config = config
@abstractmethod
async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]:
"""
处理一个子任务。
Args:
subtask_data: 包含子任务详细信息的字典。
Returns:
包含子任务处理结果的字典。
"""
pass
@abstractmethod
async def initialize(self):
"""
Agent 启动时的初始化逻辑,例如加载长期记忆、连接外部服务等。
"""
pass
@abstractmethod
async def shutdown(self):
"""
Agent 关闭时的清理逻辑,例如保存短期记忆、断开连接等。
"""
pass
|