File size: 5,334 Bytes
61d29fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
Core agent base classes and protocols for the multi-agent system.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field
from loguru import logger


class AgentRole(str, Enum):
    """Enumeration of agent roles in the system."""
    SCRAPER = "scraper"
    PARSER = "parser"
    CLASSIFIER = "classifier"
    SENTIMENT_ANALYZER = "sentiment_analyzer"
    DEBATE_GRADER = "debate_grader"
    ADVOCACY_WRITER = "advocacy_writer"
    ORCHESTRATOR = "orchestrator"


class MessageType(str, Enum):
    """Types of messages exchanged between agents."""
    DATA = "data"
    COMMAND = "command"
    QUERY = "query"
    RESPONSE = "response"
    ERROR = "error"
    STATUS = "status"


class AgentMessage(BaseModel):
    """Message structure for inter-agent communication."""
    message_id: str = Field(..., description="Unique message identifier")
    sender: AgentRole = Field(..., description="Sending agent role")
    recipient: AgentRole = Field(..., description="Receiving agent role")
    message_type: MessageType = Field(..., description="Type of message")
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    payload: Dict[str, Any] = Field(default_factory=dict, description="Message payload")
    metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
    
    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }


class AgentStatus(str, Enum):
    """Agent operational status."""
    IDLE = "idle"
    PROCESSING = "processing"
    WAITING = "waiting"
    ERROR = "error"
    COMPLETED = "completed"


class AgentState(BaseModel):
    """Current state of an agent."""
    agent_id: str
    role: AgentRole
    status: AgentStatus = AgentStatus.IDLE
    current_task: Optional[str] = None
    tasks_completed: int = 0
    tasks_failed: int = 0
    last_activity: datetime = Field(default_factory=datetime.utcnow)
    error_message: Optional[str] = None


class BaseAgent(ABC):
    """
    Abstract base class for all agents in the system.
    
    Each agent must implement the process method to handle incoming messages
    and perform its specific role in the pipeline.
    """
    
    def __init__(self, agent_id: str, role: AgentRole):
        """
        Initialize the base agent.
        
        Args:
            agent_id: Unique identifier for this agent instance
            role: The role this agent plays in the system
        """
        self.agent_id = agent_id
        self.role = role
        self.state = AgentState(agent_id=agent_id, role=role)
        self.message_queue: List[AgentMessage] = []
        logger.info(f"Initialized {role.value} agent: {agent_id}")
    
    @abstractmethod
    async def process(self, message: AgentMessage) -> Union[AgentMessage, List[AgentMessage]]:
        """
        Process an incoming message and return response(s).
        
        Args:
            message: The message to process
            
        Returns:
            One or more response messages
        """
        pass
    
    def update_status(self, status: AgentStatus, task: Optional[str] = None):
        """Update the agent's current status."""
        self.state.status = status
        self.state.current_task = task
        self.state.last_activity = datetime.utcnow()
        logger.debug(f"{self.role.value} agent {self.agent_id} status: {status.value}")
    
    def log_success(self):
        """Log a successful task completion."""
        self.state.tasks_completed += 1
        self.update_status(AgentStatus.IDLE)
    
    def log_failure(self, error: str):
        """Log a task failure."""
        self.state.tasks_failed += 1
        self.state.error_message = error
        self.update_status(AgentStatus.ERROR)
        logger.error(f"{self.role.value} agent {self.agent_id} error: {error}")
    
    async def send_message(
        self,
        recipient: AgentRole,
        message_type: MessageType,
        payload: Dict[str, Any],
        metadata: Optional[Dict[str, Any]] = None
    ) -> AgentMessage:
        """
        Create and send a message to another agent.
        
        Args:
            recipient: The receiving agent's role
            message_type: Type of message to send
            payload: Message content
            metadata: Optional metadata
            
        Returns:
            The created message
        """
        import uuid
        
        message = AgentMessage(
            message_id=str(uuid.uuid4()),
            sender=self.role,
            recipient=recipient,
            message_type=message_type,
            payload=payload,
            metadata=metadata or {}
        )
        
        return message
    
    def get_state(self) -> AgentState:
        """Get the current state of the agent."""
        return self.state


class AgentMetrics(BaseModel):
    """Metrics for monitoring agent performance."""
    agent_id: str
    role: AgentRole
    total_messages_processed: int = 0
    total_processing_time_seconds: float = 0.0
    average_processing_time_seconds: float = 0.0
    success_rate: float = 0.0
    error_count: int = 0
    last_error: Optional[str] = None
    uptime_seconds: float = 0.0