File size: 3,434 Bytes
d4b6ffc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
Base Agent Interface for Multi-AI Agentic System
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from dataclasses import dataclass
from enum import Enum
import uuid
import time

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class AgentMessage:
    """Standard message format for inter-agent communication"""
    task_id: str
    sender: str
    recipient: str
    message_type: str
    data: Dict[str, Any]
    timestamp: float
    priority: int = 1  # 1=high, 5=low

@dataclass
class TaskResult:
    """Standard result format from agent execution"""
    task_id: str
    agent_id: str
    status: TaskStatus
    result: Dict[str, Any]
    error_message: str = None
    execution_time: float = 0.0

class BaseAgent(ABC):
    """Abstract base class for all agents in the system"""
    
    def __init__(self, agent_id: str, capabilities: List[str]):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.is_active = True
        self.message_queue = []
        self.context = {}
    
    @abstractmethod
    async def process_task(self, message: AgentMessage) -> TaskResult:
        """Process a task and return results"""
        pass
    
    @abstractmethod
    def get_agent_info(self) -> Dict[str, Any]:
        """Return agent metadata and capabilities"""
        pass
    
    def can_handle_task(self, task_type: str) -> bool:
        """Check if agent can handle a specific task type"""
        return task_type in self.capabilities
    
    def add_message(self, message: AgentMessage):
        """Add message to agent's queue"""
        self.message_queue.append(message)
        # Sort by priority (lower number = higher priority)
        self.message_queue.sort(key=lambda x: x.priority)
    
    def get_next_message(self) -> AgentMessage:
        """Get next message from queue"""
        return self.message_queue.pop(0) if self.message_queue else None
    
    def update_context(self, key: str, value: Any):
        """Update agent's context"""
        self.context[key] = value
    
    def get_context(self, key: str) -> Any:
        """Get value from agent's context"""
        return self.context.get(key)

class AgentRegistry:
    """Registry to manage all agents in the system"""
    
    def __init__(self):
        self.agents = {}
        self.capabilities_map = {}
    
    def register_agent(self, agent: BaseAgent):
        """Register a new agent"""
        self.agents[agent.agent_id] = agent
        
        # Update capabilities mapping
        for capability in agent.capabilities:
            if capability not in self.capabilities_map:
                self.capabilities_map[capability] = []
            self.capabilities_map[capability].append(agent.agent_id)
    
    def get_agent(self, agent_id: str) -> BaseAgent:
        """Get agent by ID"""
        return self.agents.get(agent_id)
    
    def find_agents_by_capability(self, capability: str) -> List[BaseAgent]:
        """Find all agents that can handle a specific capability"""
        agent_ids = self.capabilities_map.get(capability, [])
        return [self.agents[agent_id] for agent_id in agent_ids if self.agents[agent_id].is_active]
    
    def get_all_agents(self) -> List[BaseAgent]:
        """Get all registered agents"""
        return list(self.agents.values())