multi-ai-agentic-system / core /base_agent.py
CHKIM79's picture
Deploy Multi-AI Agentic System v1.0.0 - Production Ready
d4b6ffc
"""
Base Agent Interface for Multi-AI Agentic System
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from dataclasses import dataclass
from enum import Enum
import uuid
import time
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentMessage:
"""Standard message format for inter-agent communication"""
task_id: str
sender: str
recipient: str
message_type: str
data: Dict[str, Any]
timestamp: float
priority: int = 1 # 1=high, 5=low
@dataclass
class TaskResult:
"""Standard result format from agent execution"""
task_id: str
agent_id: str
status: TaskStatus
result: Dict[str, Any]
error_message: str = None
execution_time: float = 0.0
class BaseAgent(ABC):
"""Abstract base class for all agents in the system"""
def __init__(self, agent_id: str, capabilities: List[str]):
self.agent_id = agent_id
self.capabilities = capabilities
self.is_active = True
self.message_queue = []
self.context = {}
@abstractmethod
async def process_task(self, message: AgentMessage) -> TaskResult:
"""Process a task and return results"""
pass
@abstractmethod
def get_agent_info(self) -> Dict[str, Any]:
"""Return agent metadata and capabilities"""
pass
def can_handle_task(self, task_type: str) -> bool:
"""Check if agent can handle a specific task type"""
return task_type in self.capabilities
def add_message(self, message: AgentMessage):
"""Add message to agent's queue"""
self.message_queue.append(message)
# Sort by priority (lower number = higher priority)
self.message_queue.sort(key=lambda x: x.priority)
def get_next_message(self) -> AgentMessage:
"""Get next message from queue"""
return self.message_queue.pop(0) if self.message_queue else None
def update_context(self, key: str, value: Any):
"""Update agent's context"""
self.context[key] = value
def get_context(self, key: str) -> Any:
"""Get value from agent's context"""
return self.context.get(key)
class AgentRegistry:
"""Registry to manage all agents in the system"""
def __init__(self):
self.agents = {}
self.capabilities_map = {}
def register_agent(self, agent: BaseAgent):
"""Register a new agent"""
self.agents[agent.agent_id] = agent
# Update capabilities mapping
for capability in agent.capabilities:
if capability not in self.capabilities_map:
self.capabilities_map[capability] = []
self.capabilities_map[capability].append(agent.agent_id)
def get_agent(self, agent_id: str) -> BaseAgent:
"""Get agent by ID"""
return self.agents.get(agent_id)
def find_agents_by_capability(self, capability: str) -> List[BaseAgent]:
"""Find all agents that can handle a specific capability"""
agent_ids = self.capabilities_map.get(capability, [])
return [self.agents[agent_id] for agent_id in agent_ids if self.agents[agent_id].is_active]
def get_all_agents(self) -> List[BaseAgent]:
"""Get all registered agents"""
return list(self.agents.values())