root_agent / agents /mcp_agent_interface.py
airsltd's picture
update
d14cb3f
from abc import ABC, abstractmethod
from typing import Dict, Any
class MCPAgent(ABC):
"""
抽象基类,定义 MCP Agent 的通用接口。
所有具体的 Agent 都应继承此接口并实现其方法。
"""
def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]):
self.id = agent_id
self.agent_type = agent_type
self.config = config
@abstractmethod
async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]:
"""
处理一个子任务。
Args:
subtask_data: 包含子任务详细信息的字典。
Returns:
包含子任务处理结果的字典。
"""
pass
@abstractmethod
async def initialize(self):
"""
Agent 启动时的初始化逻辑,例如加载长期记忆、连接外部服务等。
"""
pass
@abstractmethod
async def shutdown(self):
"""
Agent 关闭时的清理逻辑,例如保存短期记忆、断开连接等。
"""
pass