""" Caso de uso para Detección de Anomalías. Implementa la lógica de aplicación para detectar anomalías en series temporales usando pronósticos probabilísticos. """ from app.domain.services.anomaly_service import AnomalyService from app.domain.models.time_series import TimeSeries from app.domain.models.forecast_config import ForecastConfig from app.application.dtos.anomaly_dtos import ( AnomalyDetectionInputDTO, AnomalyDetectionOutputDTO, AnomalyPointDTO ) from app.utils.logger import setup_logger logger = setup_logger(__name__) class DetectAnomaliesUseCase: """ Caso de uso: Detección de Anomalías. Responsabilidad: Detectar anomalías comparando valores observados con pronósticos probabilísticos. """ def __init__(self, anomaly_service: AnomalyService): """ Inicializa el caso de uso. Args: anomaly_service: Servicio de dominio para detección de anomalías """ self.anomaly_service = anomaly_service logger.info("DetectAnomaliesUseCase initialized") def execute(self, input_dto: AnomalyDetectionInputDTO) -> AnomalyDetectionOutputDTO: """ Ejecuta el caso de uso de detección de anomalías. Args: input_dto: Datos de entrada con contexto y valores recientes Returns: AnomalyDetectionOutputDTO: Puntos de anomalía detectados Raises: ValueError: Si los datos son inválidos RuntimeError: Si falla la detección """ logger.info( f"Detecting anomalies: {len(input_dto.context_values)} context points, " f"{len(input_dto.recent_values)} recent points" ) # Validar entrada input_dto.validate() # Convertir DTO a modelos de dominio context = TimeSeries( values=input_dto.context_values, timestamps=input_dto.context_timestamps, freq=input_dto.freq ) config = ForecastConfig( prediction_length=len(input_dto.recent_values), quantile_levels=[input_dto.quantile_low, 0.5, input_dto.quantile_high], freq=input_dto.freq ) # Ejecutar servicio de dominio try: anomaly_points = self.anomaly_service.detect_anomalies( context=context, recent_observed=input_dto.recent_values, config=config, quantile_low=input_dto.quantile_low, quantile_high=input_dto.quantile_high ) logger.info(f"Anomaly detection completed") except Exception as e: logger.error(f"Anomaly detection failed: {e}", exc_info=True) raise RuntimeError(f"Anomaly detection failed: {str(e)}") from e # Convertir a DTOs y calcular severidad anomaly_dtos = [] for ap in anomaly_points: severity = self._calculate_severity(ap.z_score, ap.is_anomaly) dto = AnomalyPointDTO( index=ap.index, value=ap.value, expected=ap.expected, lower_bound=ap.lower_bound, upper_bound=ap.upper_bound, is_anomaly=ap.is_anomaly, z_score=ap.z_score, severity=severity ) anomaly_dtos.append(dto) # Calcular estadísticas anomaly_count = sum(1 for a in anomaly_dtos if a.is_anomaly) total_points = len(anomaly_dtos) anomaly_rate = anomaly_count / total_points if total_points > 0 else 0.0 # Crear resumen summary = self._create_summary(anomaly_dtos, input_dto) logger.info( f"Anomalies detected: {anomaly_count}/{total_points} " f"({anomaly_rate*100:.1f}%)" ) # Crear DTO de salida output_dto = AnomalyDetectionOutputDTO( anomalies=anomaly_dtos, total_points=total_points, anomaly_count=anomaly_count, anomaly_rate=anomaly_rate, summary=summary ) return output_dto def _calculate_severity(self, z_score: float, is_anomaly: bool) -> str: """ Calcula la severidad de una anomalía basándose en el z-score. Args: z_score: Puntuación Z is_anomaly: Si es anomalía Returns: str: Nivel de severidad (normal, low, medium, high) """ if not is_anomaly: return "normal" if z_score < 1.5: return "low" elif z_score < 2.5: return "medium" else: return "high" def _create_summary( self, anomaly_dtos: list, input_dto: AnomalyDetectionInputDTO ) -> dict: """ Crea un resumen de la detección de anomalías. Args: anomaly_dtos: Lista de anomalías detectadas input_dto: Datos de entrada originales Returns: dict: Resumen con estadísticas """ anomalies_only = [a for a in anomaly_dtos if a.is_anomaly] if not anomalies_only: return { "has_anomalies": False, "severity_distribution": {"normal": len(anomaly_dtos)}, "max_z_score": 0.0, "avg_deviation": 0.0 } # Distribución por severidad severity_dist = { "normal": sum(1 for a in anomaly_dtos if a.severity == "normal"), "low": sum(1 for a in anomaly_dtos if a.severity == "low"), "medium": sum(1 for a in anomaly_dtos if a.severity == "medium"), "high": sum(1 for a in anomaly_dtos if a.severity == "high") } # Estadísticas de anomalías max_z_score = max(a.z_score for a in anomalies_only) avg_deviation = sum( abs(a.value - a.expected) for a in anomalies_only ) / len(anomalies_only) return { "has_anomalies": True, "severity_distribution": severity_dist, "max_z_score": round(max_z_score, 2), "avg_deviation": round(avg_deviation, 2), "quantile_range": { "low": input_dto.quantile_low, "high": input_dto.quantile_high } }