""" domain/exceptions/pipeline_exceptions.py ──────────────────────────────────────── Custom domain-specific exceptions for signal preprocessing, model inference, and message broker operations. Keeps clean architecture by remaining decoupled from infrastructure frameworks (e.g. FastAPI, PyTorch, aio-pika). Hierarchy (all extend DomainException): PreprocessingError — signal preprocessing / ETL failure → 422 ModelInferenceError — deep learning model failure → 503 BrokerError — message broker publish/consume error → 503 """ from __future__ import annotations from src.domain.exceptions.domain_exceptions import DomainException class PreprocessingError(DomainException): """ Raised when an error occurs during signal preprocessing or ETL. Examples: - Signal duration is too short for segmenting. - Resampling or filtering fails. - Visibility Graph calculation fails or produces invalid results. HTTP Status: 422 Unprocessable Entity """ def __init__(self, step: str, reason: str, context: dict | None = None) -> None: message = f"Preprocessing failed at step '{step}': {reason}" ctx = {"step": step, "reason": reason} if context: ctx.update(context) super().__init__(message, context=ctx) self.step = step self.reason = reason class ModelInferenceError(DomainException): """ Raised when a deep learning model inference fails. Examples: - Missing weight checkpoints on disk. - Checkpoint layout does not match state_dict expectation. - PyTorch model forward pass fails. HTTP Status: 503 Service Unavailable """ def __init__(self, model_name: str, reason: str, context: dict | None = None) -> None: message = f"Model '{model_name}' inference failed: {reason}" ctx = {"model_name": model_name, "reason": reason} if context: ctx.update(context) super().__init__(message, context=ctx) self.model_name = model_name self.reason = reason class BrokerError(DomainException): """ Raised when the message broker (RabbitMQ) is unreachable or a publish/consume operation fails. Wraps aio-pika / AMQP low-level errors so that domain and application layers never import broker-specific libraries directly. HTTP Status: 503 Service Unavailable Args: operation: The broker operation that failed (e.g. ``"publish"``, ``"connect"``). reason: Human-readable description of what went wrong. """ def __init__(self, operation: str, reason: str) -> None: message = f"Broker error during '{operation}': {reason}" super().__init__(message, context={"operation": operation, "reason": reason}) self.operation = operation self.reason = reason