from abc import ABC, abstractmethod from typing import Dict, Any class MCPAgent(ABC): """ 抽象基类,定义 MCP Agent 的通用接口。 所有具体的 Agent 都应继承此接口并实现其方法。 """ def __init__(self, agent_id: str, agent_type: str, config: Dict[str, Any]): self.id = agent_id self.agent_type = agent_type self.config = config @abstractmethod async def process_subtask(self, subtask_data: Dict[str, Any]) -> Dict[str, Any]: """ 处理一个子任务。 Args: subtask_data: 包含子任务详细信息的字典。 Returns: 包含子任务处理结果的字典。 """ pass @abstractmethod async def initialize(self): """ Agent 启动时的初始化逻辑,例如加载长期记忆、连接外部服务等。 """ pass @abstractmethod async def shutdown(self): """ Agent 关闭时的清理逻辑,例如保存短期记忆、断开连接等。 """ pass