jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
"""
Core agent base classes and protocols for the multi-agent system.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field
from loguru import logger
class AgentRole(str, Enum):
"""Enumeration of agent roles in the system."""
SCRAPER = "scraper"
PARSER = "parser"
CLASSIFIER = "classifier"
SENTIMENT_ANALYZER = "sentiment_analyzer"
DEBATE_GRADER = "debate_grader"
ADVOCACY_WRITER = "advocacy_writer"
ORCHESTRATOR = "orchestrator"
class MessageType(str, Enum):
"""Types of messages exchanged between agents."""
DATA = "data"
COMMAND = "command"
QUERY = "query"
RESPONSE = "response"
ERROR = "error"
STATUS = "status"
class AgentMessage(BaseModel):
"""Message structure for inter-agent communication."""
message_id: str = Field(..., description="Unique message identifier")
sender: AgentRole = Field(..., description="Sending agent role")
recipient: AgentRole = Field(..., description="Receiving agent role")
message_type: MessageType = Field(..., description="Type of message")
timestamp: datetime = Field(default_factory=datetime.utcnow)
payload: Dict[str, Any] = Field(default_factory=dict, description="Message payload")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
class AgentStatus(str, Enum):
"""Agent operational status."""
IDLE = "idle"
PROCESSING = "processing"
WAITING = "waiting"
ERROR = "error"
COMPLETED = "completed"
class AgentState(BaseModel):
"""Current state of an agent."""
agent_id: str
role: AgentRole
status: AgentStatus = AgentStatus.IDLE
current_task: Optional[str] = None
tasks_completed: int = 0
tasks_failed: int = 0
last_activity: datetime = Field(default_factory=datetime.utcnow)
error_message: Optional[str] = None
class BaseAgent(ABC):
"""
Abstract base class for all agents in the system.
Each agent must implement the process method to handle incoming messages
and perform its specific role in the pipeline.
"""
def __init__(self, agent_id: str, role: AgentRole):
"""
Initialize the base agent.
Args:
agent_id: Unique identifier for this agent instance
role: The role this agent plays in the system
"""
self.agent_id = agent_id
self.role = role
self.state = AgentState(agent_id=agent_id, role=role)
self.message_queue: List[AgentMessage] = []
logger.info(f"Initialized {role.value} agent: {agent_id}")
@abstractmethod
async def process(self, message: AgentMessage) -> Union[AgentMessage, List[AgentMessage]]:
"""
Process an incoming message and return response(s).
Args:
message: The message to process
Returns:
One or more response messages
"""
pass
def update_status(self, status: AgentStatus, task: Optional[str] = None):
"""Update the agent's current status."""
self.state.status = status
self.state.current_task = task
self.state.last_activity = datetime.utcnow()
logger.debug(f"{self.role.value} agent {self.agent_id} status: {status.value}")
def log_success(self):
"""Log a successful task completion."""
self.state.tasks_completed += 1
self.update_status(AgentStatus.IDLE)
def log_failure(self, error: str):
"""Log a task failure."""
self.state.tasks_failed += 1
self.state.error_message = error
self.update_status(AgentStatus.ERROR)
logger.error(f"{self.role.value} agent {self.agent_id} error: {error}")
async def send_message(
self,
recipient: AgentRole,
message_type: MessageType,
payload: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None
) -> AgentMessage:
"""
Create and send a message to another agent.
Args:
recipient: The receiving agent's role
message_type: Type of message to send
payload: Message content
metadata: Optional metadata
Returns:
The created message
"""
import uuid
message = AgentMessage(
message_id=str(uuid.uuid4()),
sender=self.role,
recipient=recipient,
message_type=message_type,
payload=payload,
metadata=metadata or {}
)
return message
def get_state(self) -> AgentState:
"""Get the current state of the agent."""
return self.state
class AgentMetrics(BaseModel):
"""Metrics for monitoring agent performance."""
agent_id: str
role: AgentRole
total_messages_processed: int = 0
total_processing_time_seconds: float = 0.0
average_processing_time_seconds: float = 0.0
success_rate: float = 0.0
error_count: int = 0
last_error: Optional[str] = None
uptime_seconds: float = 0.0