""" Base Agent Interface for Multi-AI Agentic System """ from abc import ABC, abstractmethod from typing import Dict, Any, List from dataclasses import dataclass from enum import Enum import uuid import time class TaskStatus(Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" @dataclass class AgentMessage: """Standard message format for inter-agent communication""" task_id: str sender: str recipient: str message_type: str data: Dict[str, Any] timestamp: float priority: int = 1 # 1=high, 5=low @dataclass class TaskResult: """Standard result format from agent execution""" task_id: str agent_id: str status: TaskStatus result: Dict[str, Any] error_message: str = None execution_time: float = 0.0 class BaseAgent(ABC): """Abstract base class for all agents in the system""" def __init__(self, agent_id: str, capabilities: List[str]): self.agent_id = agent_id self.capabilities = capabilities self.is_active = True self.message_queue = [] self.context = {} @abstractmethod async def process_task(self, message: AgentMessage) -> TaskResult: """Process a task and return results""" pass @abstractmethod def get_agent_info(self) -> Dict[str, Any]: """Return agent metadata and capabilities""" pass def can_handle_task(self, task_type: str) -> bool: """Check if agent can handle a specific task type""" return task_type in self.capabilities def add_message(self, message: AgentMessage): """Add message to agent's queue""" self.message_queue.append(message) # Sort by priority (lower number = higher priority) self.message_queue.sort(key=lambda x: x.priority) def get_next_message(self) -> AgentMessage: """Get next message from queue""" return self.message_queue.pop(0) if self.message_queue else None def update_context(self, key: str, value: Any): """Update agent's context""" self.context[key] = value def get_context(self, key: str) -> Any: """Get value from agent's context""" return self.context.get(key) class AgentRegistry: """Registry to manage all agents in the system""" def __init__(self): self.agents = {} self.capabilities_map = {} def register_agent(self, agent: BaseAgent): """Register a new agent""" self.agents[agent.agent_id] = agent # Update capabilities mapping for capability in agent.capabilities: if capability not in self.capabilities_map: self.capabilities_map[capability] = [] self.capabilities_map[capability].append(agent.agent_id) def get_agent(self, agent_id: str) -> BaseAgent: """Get agent by ID""" return self.agents.get(agent_id) def find_agents_by_capability(self, capability: str) -> List[BaseAgent]: """Find all agents that can handle a specific capability""" agent_ids = self.capabilities_map.get(capability, []) return [self.agents[agent_id] for agent_id in agent_ids if self.agents[agent_id].is_active] def get_all_agents(self) -> List[BaseAgent]: """Get all registered agents""" return list(self.agents.values())