| """ |
| domain/exceptions/pipeline_exceptions.py |
| ββββββββββββββββββββββββββββββββββββββββ |
| Custom domain-specific exceptions for signal preprocessing, model inference, |
| and message broker operations. |
| |
| Keeps clean architecture by remaining decoupled from infrastructure frameworks |
| (e.g. FastAPI, PyTorch, aio-pika). |
| |
| Hierarchy (all extend DomainException): |
| PreprocessingError β signal preprocessing / ETL failure β 422 |
| ModelInferenceError β deep learning model failure β 503 |
| BrokerError β message broker publish/consume error β 503 |
| """ |
| from __future__ import annotations |
|
|
| from src.domain.exceptions.domain_exceptions import DomainException |
|
|
|
|
| class PreprocessingError(DomainException): |
| """ |
| Raised when an error occurs during signal preprocessing or ETL. |
| |
| Examples: |
| - Signal duration is too short for segmenting. |
| - Resampling or filtering fails. |
| - Visibility Graph calculation fails or produces invalid results. |
| |
| HTTP Status: 422 Unprocessable Entity |
| """ |
|
|
| def __init__(self, step: str, reason: str, context: dict | None = None) -> None: |
| message = f"Preprocessing failed at step '{step}': {reason}" |
| ctx = {"step": step, "reason": reason} |
| if context: |
| ctx.update(context) |
| super().__init__(message, context=ctx) |
| self.step = step |
| self.reason = reason |
|
|
|
|
| class ModelInferenceError(DomainException): |
| """ |
| Raised when a deep learning model inference fails. |
| |
| Examples: |
| - Missing weight checkpoints on disk. |
| - Checkpoint layout does not match state_dict expectation. |
| - PyTorch model forward pass fails. |
| |
| HTTP Status: 503 Service Unavailable |
| """ |
|
|
| def __init__(self, model_name: str, reason: str, context: dict | None = None) -> None: |
| message = f"Model '{model_name}' inference failed: {reason}" |
| ctx = {"model_name": model_name, "reason": reason} |
| if context: |
| ctx.update(context) |
| super().__init__(message, context=ctx) |
| self.model_name = model_name |
| self.reason = reason |
|
|
|
|
| class BrokerError(DomainException): |
| """ |
| Raised when the message broker (RabbitMQ) is unreachable or a |
| publish/consume operation fails. |
| |
| Wraps aio-pika / AMQP low-level errors so that domain and application |
| layers never import broker-specific libraries directly. |
| |
| HTTP Status: 503 Service Unavailable |
| |
| Args: |
| operation: The broker operation that failed (e.g. ``"publish"``, ``"connect"``). |
| reason: Human-readable description of what went wrong. |
| """ |
|
|
| def __init__(self, operation: str, reason: str) -> None: |
| message = f"Broker error during '{operation}': {reason}" |
| super().__init__(message, context={"operation": operation, "reason": reason}) |
| self.operation = operation |
| self.reason = reason |
|
|