Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Core agent base classes and protocols for the multi-agent system. | |
| """ | |
| from abc import ABC, abstractmethod | |
| from typing import Any, Dict, List, Optional, Union | |
| from datetime import datetime | |
| from enum import Enum | |
| from pydantic import BaseModel, Field | |
| from loguru import logger | |
| class AgentRole(str, Enum): | |
| """Enumeration of agent roles in the system.""" | |
| SCRAPER = "scraper" | |
| PARSER = "parser" | |
| CLASSIFIER = "classifier" | |
| SENTIMENT_ANALYZER = "sentiment_analyzer" | |
| DEBATE_GRADER = "debate_grader" | |
| ADVOCACY_WRITER = "advocacy_writer" | |
| ORCHESTRATOR = "orchestrator" | |
| class MessageType(str, Enum): | |
| """Types of messages exchanged between agents.""" | |
| DATA = "data" | |
| COMMAND = "command" | |
| QUERY = "query" | |
| RESPONSE = "response" | |
| ERROR = "error" | |
| STATUS = "status" | |
| class AgentMessage(BaseModel): | |
| """Message structure for inter-agent communication.""" | |
| message_id: str = Field(..., description="Unique message identifier") | |
| sender: AgentRole = Field(..., description="Sending agent role") | |
| recipient: AgentRole = Field(..., description="Receiving agent role") | |
| message_type: MessageType = Field(..., description="Type of message") | |
| timestamp: datetime = Field(default_factory=datetime.utcnow) | |
| payload: Dict[str, Any] = Field(default_factory=dict, description="Message payload") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: v.isoformat() | |
| } | |
| class AgentStatus(str, Enum): | |
| """Agent operational status.""" | |
| IDLE = "idle" | |
| PROCESSING = "processing" | |
| WAITING = "waiting" | |
| ERROR = "error" | |
| COMPLETED = "completed" | |
| class AgentState(BaseModel): | |
| """Current state of an agent.""" | |
| agent_id: str | |
| role: AgentRole | |
| status: AgentStatus = AgentStatus.IDLE | |
| current_task: Optional[str] = None | |
| tasks_completed: int = 0 | |
| tasks_failed: int = 0 | |
| last_activity: datetime = Field(default_factory=datetime.utcnow) | |
| error_message: Optional[str] = None | |
| class BaseAgent(ABC): | |
| """ | |
| Abstract base class for all agents in the system. | |
| Each agent must implement the process method to handle incoming messages | |
| and perform its specific role in the pipeline. | |
| """ | |
| def __init__(self, agent_id: str, role: AgentRole): | |
| """ | |
| Initialize the base agent. | |
| Args: | |
| agent_id: Unique identifier for this agent instance | |
| role: The role this agent plays in the system | |
| """ | |
| self.agent_id = agent_id | |
| self.role = role | |
| self.state = AgentState(agent_id=agent_id, role=role) | |
| self.message_queue: List[AgentMessage] = [] | |
| logger.info(f"Initialized {role.value} agent: {agent_id}") | |
| async def process(self, message: AgentMessage) -> Union[AgentMessage, List[AgentMessage]]: | |
| """ | |
| Process an incoming message and return response(s). | |
| Args: | |
| message: The message to process | |
| Returns: | |
| One or more response messages | |
| """ | |
| pass | |
| def update_status(self, status: AgentStatus, task: Optional[str] = None): | |
| """Update the agent's current status.""" | |
| self.state.status = status | |
| self.state.current_task = task | |
| self.state.last_activity = datetime.utcnow() | |
| logger.debug(f"{self.role.value} agent {self.agent_id} status: {status.value}") | |
| def log_success(self): | |
| """Log a successful task completion.""" | |
| self.state.tasks_completed += 1 | |
| self.update_status(AgentStatus.IDLE) | |
| def log_failure(self, error: str): | |
| """Log a task failure.""" | |
| self.state.tasks_failed += 1 | |
| self.state.error_message = error | |
| self.update_status(AgentStatus.ERROR) | |
| logger.error(f"{self.role.value} agent {self.agent_id} error: {error}") | |
| async def send_message( | |
| self, | |
| recipient: AgentRole, | |
| message_type: MessageType, | |
| payload: Dict[str, Any], | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> AgentMessage: | |
| """ | |
| Create and send a message to another agent. | |
| Args: | |
| recipient: The receiving agent's role | |
| message_type: Type of message to send | |
| payload: Message content | |
| metadata: Optional metadata | |
| Returns: | |
| The created message | |
| """ | |
| import uuid | |
| message = AgentMessage( | |
| message_id=str(uuid.uuid4()), | |
| sender=self.role, | |
| recipient=recipient, | |
| message_type=message_type, | |
| payload=payload, | |
| metadata=metadata or {} | |
| ) | |
| return message | |
| def get_state(self) -> AgentState: | |
| """Get the current state of the agent.""" | |
| return self.state | |
| class AgentMetrics(BaseModel): | |
| """Metrics for monitoring agent performance.""" | |
| agent_id: str | |
| role: AgentRole | |
| total_messages_processed: int = 0 | |
| total_processing_time_seconds: float = 0.0 | |
| average_processing_time_seconds: float = 0.0 | |
| success_rate: float = 0.0 | |
| error_count: int = 0 | |
| last_error: Optional[str] = None | |
| uptime_seconds: float = 0.0 | |