Spaces:
Build error
Build error
| """ | |
| Base Agent class providing common functionality for all agent implementations. | |
| """ | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| import logging | |
| import asyncio | |
| from uuid import uuid4 | |
| from src.core.entities.agent import Agent, AgentType, AgentState | |
| from src.shared.types import AgentConfig | |
| class BaseAgent(ABC): | |
| """ | |
| Base class for all agent implementations. | |
| This class provides common functionality and interfaces that all | |
| agent implementations should inherit from. | |
| """ | |
| def __init__(self, agent_id: Optional[str] = None, name: str = "Base Agent"): | |
| self.agent_id = agent_id or str(uuid4()) | |
| self.name = name | |
| self.config: Optional[AgentConfig] = None | |
| self.logger = logging.getLogger(f"{self.__class__.__name__}-{name}") | |
| self.created_at = datetime.now() | |
| self.last_active = datetime.now() | |
| # Performance tracking | |
| self.total_requests = 0 | |
| self.successful_requests = 0 | |
| self.failed_requests = 0 | |
| self.average_response_time = 0.0 | |
| # State management | |
| self._is_initialized = False | |
| self._is_shutdown = False | |
| async def initialize(self, config: Dict[str, Any]) -> bool: | |
| """ | |
| Initialize the agent with configuration. | |
| Args: | |
| config: Configuration dictionary for the agent | |
| Returns: | |
| True if initialization was successful, False otherwise | |
| """ | |
| pass | |
| async def execute(self, task: Any) -> Any: | |
| """ | |
| Execute a task. | |
| Args: | |
| task: The task to execute | |
| Returns: | |
| The result of task execution | |
| """ | |
| pass | |
| async def shutdown(self) -> bool: | |
| """ | |
| Gracefully shutdown the agent. | |
| Returns: | |
| True if shutdown was successful, False otherwise | |
| """ | |
| pass | |
| async def health_check(self) -> Dict[str, Any]: | |
| """ | |
| Perform a health check on the agent. | |
| Returns: | |
| Dictionary containing health information | |
| """ | |
| pass | |
| def update_activity(self) -> None: | |
| """Update the last activity timestamp.""" | |
| self.last_active = datetime.now() | |
| def record_request(self, success: bool, response_time: float) -> None: | |
| """ | |
| Record a request and its performance metrics. | |
| Args: | |
| success: Whether the request was successful | |
| response_time: Time taken to process the request | |
| """ | |
| self.total_requests += 1 | |
| if success: | |
| self.successful_requests += 1 | |
| else: | |
| self.failed_requests += 1 | |
| # Update average response time using exponential moving average | |
| if self.total_requests == 1: | |
| self.average_response_time = response_time | |
| else: | |
| alpha = 0.1 | |
| self.average_response_time = ( | |
| alpha * response_time + | |
| (1 - alpha) * self.average_response_time | |
| ) | |
| self.update_activity() | |
| def success_rate(self) -> float: | |
| """Calculate the success rate of the agent.""" | |
| if self.total_requests == 0: | |
| return 0.0 | |
| return self.successful_requests / self.total_requests | |
| def is_available(self) -> bool: | |
| """Check if the agent is available for new tasks.""" | |
| return self._is_initialized and not self._is_shutdown | |
| def uptime(self) -> float: | |
| """Get the uptime of the agent in seconds.""" | |
| return (datetime.now() - self.created_at).total_seconds() | |
| def get_performance_metrics(self) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive performance metrics for the agent. | |
| Returns: | |
| Dictionary containing performance metrics | |
| """ | |
| return { | |
| "agent_id": self.agent_id, | |
| "name": self.name, | |
| "total_requests": self.total_requests, | |
| "successful_requests": self.successful_requests, | |
| "failed_requests": self.failed_requests, | |
| "success_rate": self.success_rate, | |
| "average_response_time": self.average_response_time, | |
| "uptime": self.uptime, | |
| "last_active": self.last_active.isoformat(), | |
| "is_available": self.is_available, | |
| "is_initialized": self._is_initialized, | |
| "is_shutdown": self._is_shutdown | |
| } | |
| async def validate_configuration(self, config: Dict[str, Any]) -> bool: | |
| """ | |
| Validate agent configuration. | |
| Args: | |
| config: Configuration to validate | |
| Returns: | |
| True if configuration is valid, False otherwise | |
| """ | |
| try: | |
| # Basic validation - can be overridden by subclasses | |
| required_fields = ["agent_type", "model_config"] | |
| for field in required_fields: | |
| if field not in config: | |
| self.logger.error(f"Missing required configuration field: {field}") | |
| return False | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Configuration validation failed: {e}") | |
| return False | |
| def _mark_initialized(self) -> None: | |
| """Mark the agent as initialized.""" | |
| self._is_initialized = True | |
| self.logger.info(f"Agent {self.name} marked as initialized") | |
| def _mark_shutdown(self) -> None: | |
| """Mark the agent as shutdown.""" | |
| self._is_shutdown = True | |
| self.logger.info(f"Agent {self.name} marked as shutdown") | |
| async def _safe_execute(self, task: Any, timeout: Optional[float] = None) -> Any: | |
| """ | |
| Safely execute a task with error handling and timeout. | |
| Args: | |
| task: The task to execute | |
| timeout: Optional timeout in seconds | |
| Returns: | |
| The result of task execution | |
| Raises: | |
| Exception: If task execution fails | |
| """ | |
| start_time = datetime.now() | |
| try: | |
| if timeout: | |
| result = await asyncio.wait_for(self.execute(task), timeout=timeout) | |
| else: | |
| result = await self.execute(task) | |
| response_time = (datetime.now() - start_time).total_seconds() | |
| self.record_request(True, response_time) | |
| return result | |
| except asyncio.TimeoutError: | |
| response_time = (datetime.now() - start_time).total_seconds() | |
| self.record_request(False, response_time) | |
| raise Exception(f"Task execution timed out after {timeout} seconds") | |
| except Exception as e: | |
| response_time = (datetime.now() - start_time).total_seconds() | |
| self.record_request(False, response_time) | |
| self.logger.error(f"Task execution failed: {e}") | |
| raise | |
| def __str__(self) -> str: | |
| """String representation of the agent.""" | |
| return f"{self.__class__.__name__}(id={self.agent_id}, name={self.name})" | |
| def __repr__(self) -> str: | |
| """Detailed string representation of the agent.""" | |
| return (f"{self.__class__.__name__}(agent_id='{self.agent_id}', " | |
| f"name='{self.name}', initialized={self._is_initialized}, " | |
| f"shutdown={self._is_shutdown})") |