| """ |
| Base Agent Interface for Multi-AI Agentic System |
| """ |
| from abc import ABC, abstractmethod |
| from typing import Dict, Any, List |
| from dataclasses import dataclass |
| from enum import Enum |
| import uuid |
| import time |
|
|
| class TaskStatus(Enum): |
| PENDING = "pending" |
| IN_PROGRESS = "in_progress" |
| COMPLETED = "completed" |
| FAILED = "failed" |
|
|
| @dataclass |
| class AgentMessage: |
| """Standard message format for inter-agent communication""" |
| task_id: str |
| sender: str |
| recipient: str |
| message_type: str |
| data: Dict[str, Any] |
| timestamp: float |
| priority: int = 1 |
|
|
| @dataclass |
| class TaskResult: |
| """Standard result format from agent execution""" |
| task_id: str |
| agent_id: str |
| status: TaskStatus |
| result: Dict[str, Any] |
| error_message: str = None |
| execution_time: float = 0.0 |
|
|
| class BaseAgent(ABC): |
| """Abstract base class for all agents in the system""" |
| |
| def __init__(self, agent_id: str, capabilities: List[str]): |
| self.agent_id = agent_id |
| self.capabilities = capabilities |
| self.is_active = True |
| self.message_queue = [] |
| self.context = {} |
| |
| @abstractmethod |
| async def process_task(self, message: AgentMessage) -> TaskResult: |
| """Process a task and return results""" |
| pass |
| |
| @abstractmethod |
| def get_agent_info(self) -> Dict[str, Any]: |
| """Return agent metadata and capabilities""" |
| pass |
| |
| def can_handle_task(self, task_type: str) -> bool: |
| """Check if agent can handle a specific task type""" |
| return task_type in self.capabilities |
| |
| def add_message(self, message: AgentMessage): |
| """Add message to agent's queue""" |
| self.message_queue.append(message) |
| |
| self.message_queue.sort(key=lambda x: x.priority) |
| |
| def get_next_message(self) -> AgentMessage: |
| """Get next message from queue""" |
| return self.message_queue.pop(0) if self.message_queue else None |
| |
| def update_context(self, key: str, value: Any): |
| """Update agent's context""" |
| self.context[key] = value |
| |
| def get_context(self, key: str) -> Any: |
| """Get value from agent's context""" |
| return self.context.get(key) |
|
|
| class AgentRegistry: |
| """Registry to manage all agents in the system""" |
| |
| def __init__(self): |
| self.agents = {} |
| self.capabilities_map = {} |
| |
| def register_agent(self, agent: BaseAgent): |
| """Register a new agent""" |
| self.agents[agent.agent_id] = agent |
| |
| |
| for capability in agent.capabilities: |
| if capability not in self.capabilities_map: |
| self.capabilities_map[capability] = [] |
| self.capabilities_map[capability].append(agent.agent_id) |
| |
| def get_agent(self, agent_id: str) -> BaseAgent: |
| """Get agent by ID""" |
| return self.agents.get(agent_id) |
| |
| def find_agents_by_capability(self, capability: str) -> List[BaseAgent]: |
| """Find all agents that can handle a specific capability""" |
| agent_ids = self.capabilities_map.get(capability, []) |
| return [self.agents[agent_id] for agent_id in agent_ids if self.agents[agent_id].is_active] |
| |
| def get_all_agents(self) -> List[BaseAgent]: |
| """Get all registered agents""" |
| return list(self.agents.values()) |
|
|