|
|
""" |
|
|
Classe de base pour tous les agents du système. |
|
|
Définit l'interface commune et les fonctionnalités partagées. |
|
|
""" |
|
|
|
|
|
from abc import ABC, abstractmethod |
|
|
from typing import Any, Dict, Optional, TypeVar, Generic |
|
|
from datetime import datetime |
|
|
import asyncio |
|
|
import uuid |
|
|
|
|
|
from src.core.logging import setup_logger |
|
|
from src.models.state_models import AgentState, AgentStatus, AgentType |
|
|
|
|
|
|
|
|
|
|
|
InputType = TypeVar('InputType') |
|
|
OutputType = TypeVar('OutputType') |
|
|
|
|
|
|
|
|
class BaseAgent(ABC, Generic[InputType, OutputType]): |
|
|
""" |
|
|
Classe de base abstraite pour tous les agents du système. |
|
|
|
|
|
Fournit les fonctionnalités communes : |
|
|
- Gestion de l'état |
|
|
- Logging |
|
|
- Gestion des erreurs et retry |
|
|
- Métriques de performance |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
agent_type: AgentType, |
|
|
name: Optional[str] = None, |
|
|
max_retries: int = 3, |
|
|
timeout: float = 300.0 |
|
|
): |
|
|
""" |
|
|
Initialise l'agent de base. |
|
|
|
|
|
Args: |
|
|
agent_type: Type de l'agent |
|
|
name: Nom personnalisé de l'agent |
|
|
max_retries: Nombre maximum de tentatives en cas d'erreur |
|
|
timeout: Timeout en secondes pour l'exécution |
|
|
""" |
|
|
self.agent_type = agent_type |
|
|
self.name = name or f"{agent_type.value}_agent" |
|
|
self.agent_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
self.max_retries = max_retries |
|
|
self.timeout = timeout |
|
|
|
|
|
|
|
|
self.state = AgentState( |
|
|
agent_type=agent_type, |
|
|
max_retries=max_retries |
|
|
) |
|
|
|
|
|
|
|
|
self.logger = setup_logger(f"agent_{self.name}") |
|
|
|
|
|
|
|
|
self.metrics = { |
|
|
"total_executions": 0, |
|
|
"successful_executions": 0, |
|
|
"failed_executions": 0, |
|
|
"total_processing_time": 0.0, |
|
|
"average_processing_time": 0.0 |
|
|
} |
|
|
|
|
|
self.logger.info(f"Agent {self.name} initialisé (ID: {self.agent_id})") |
|
|
|
|
|
@abstractmethod |
|
|
async def process(self, input_data: InputType) -> OutputType: |
|
|
""" |
|
|
Méthode principale de traitement de l'agent. |
|
|
Doit être implémentée par chaque agent concret. |
|
|
|
|
|
Args: |
|
|
input_data: Données d'entrée spécifiques à l'agent |
|
|
|
|
|
Returns: |
|
|
Données de sortie spécifiques à l'agent |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def validate_input(self, input_data: InputType) -> bool: |
|
|
""" |
|
|
Valide les données d'entrée. |
|
|
|
|
|
Args: |
|
|
input_data: Données à valider |
|
|
|
|
|
Returns: |
|
|
True si les données sont valides |
|
|
""" |
|
|
pass |
|
|
|
|
|
async def execute(self, input_data: InputType) -> OutputType: |
|
|
""" |
|
|
Exécute l'agent avec gestion des erreurs et retry. |
|
|
|
|
|
Args: |
|
|
input_data: Données d'entrée |
|
|
|
|
|
Returns: |
|
|
Résultat de l'exécution |
|
|
|
|
|
Raises: |
|
|
Exception: Si l'exécution échoue après tous les retry |
|
|
""" |
|
|
self.logger.info(f"Début d'exécution de l'agent {self.name}") |
|
|
self.state.start_execution() |
|
|
self.metrics["total_executions"] += 1 |
|
|
|
|
|
|
|
|
if not self.validate_input(input_data): |
|
|
error_msg = f"Données d'entrée invalides pour l'agent {self.name}" |
|
|
self.logger.error(error_msg) |
|
|
self.state.mark_error(error_msg) |
|
|
self.metrics["failed_executions"] += 1 |
|
|
raise ValueError(error_msg) |
|
|
|
|
|
|
|
|
last_exception = None |
|
|
|
|
|
for attempt in range(self.max_retries + 1): |
|
|
try: |
|
|
self.logger.info(f"Tentative {attempt + 1}/{self.max_retries + 1}") |
|
|
|
|
|
|
|
|
result = await asyncio.wait_for( |
|
|
self.process(input_data), |
|
|
timeout=self.timeout |
|
|
) |
|
|
|
|
|
|
|
|
self.state.complete_execution() |
|
|
self.metrics["successful_executions"] += 1 |
|
|
self._update_processing_time() |
|
|
|
|
|
self.logger.info(f"Agent {self.name} terminé avec succès") |
|
|
return result |
|
|
|
|
|
except asyncio.TimeoutError as e: |
|
|
error_msg = f"Timeout atteint pour l'agent {self.name} (>{self.timeout}s)" |
|
|
self.logger.warning(error_msg) |
|
|
last_exception = e |
|
|
self.state.retry_count += 1 |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erreur dans l'agent {self.name}: {str(e)}" |
|
|
self.logger.warning(error_msg) |
|
|
last_exception = e |
|
|
self.state.retry_count += 1 |
|
|
|
|
|
|
|
|
if attempt < self.max_retries: |
|
|
wait_time = 2 ** attempt |
|
|
self.logger.info(f"Attente de {wait_time}s avant la prochaine tentative") |
|
|
await asyncio.sleep(wait_time) |
|
|
|
|
|
|
|
|
final_error = f"Agent {self.name} a échoué après {self.max_retries + 1} tentatives" |
|
|
self.logger.error(final_error) |
|
|
self.state.mark_error(final_error) |
|
|
self.metrics["failed_executions"] += 1 |
|
|
|
|
|
raise Exception(final_error) from last_exception |
|
|
|
|
|
def _update_processing_time(self): |
|
|
"""Met à jour les métriques de temps de traitement.""" |
|
|
if self.state.duration: |
|
|
self.metrics["total_processing_time"] += self.state.duration |
|
|
self.metrics["average_processing_time"] = ( |
|
|
self.metrics["total_processing_time"] / |
|
|
self.metrics["successful_executions"] |
|
|
) |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Retourne le statut actuel de l'agent. |
|
|
|
|
|
Returns: |
|
|
Dictionnaire avec les informations de statut |
|
|
""" |
|
|
return { |
|
|
"agent_id": self.agent_id, |
|
|
"name": self.name, |
|
|
"type": self.agent_type.value, |
|
|
"status": self.state.status.value, |
|
|
"retry_count": self.state.retry_count, |
|
|
"duration": self.state.duration, |
|
|
"error_message": self.state.error_message, |
|
|
"metrics": self.metrics, |
|
|
"last_execution": self.state.end_time.isoformat() if self.state.end_time else None |
|
|
} |
|
|
|
|
|
def reset(self): |
|
|
"""Remet l'agent à zéro pour une nouvelle exécution.""" |
|
|
self.state = AgentState( |
|
|
agent_type=self.agent_type, |
|
|
max_retries=self.max_retries |
|
|
) |
|
|
self.logger.info(f"Agent {self.name} remis à zéro") |
|
|
|
|
|
def __str__(self) -> str: |
|
|
return f"{self.__class__.__name__}(name={self.name}, status={self.state.status.value})" |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
return (f"{self.__class__.__name__}(agent_id={self.agent_id}, " |
|
|
f"type={self.agent_type.value}, status={self.state.status.value})") |
|
|
|
|
|
|
|
|
class AgentError(Exception): |
|
|
"""Exception personnalisée pour les erreurs d'agents.""" |
|
|
|
|
|
def __init__(self, message: str, agent_name: str, agent_id: str): |
|
|
self.agent_name = agent_name |
|
|
self.agent_id = agent_id |
|
|
super().__init__(f"Agent {agent_name} ({agent_id}): {message}") |
|
|
|
|
|
|
|
|
class AgentTimeoutError(AgentError): |
|
|
"""Exception pour les timeouts d'agents.""" |
|
|
pass |
|
|
|
|
|
|
|
|
class AgentValidationError(AgentError): |
|
|
"""Exception pour les erreurs de validation d'agents.""" |
|
|
pass |