LIBRE / src /domain /exceptions /pipeline_exceptions.py
RyZ
feat: adding full working local ETL Pipeline
e391a84
Raw
History Blame Contribute Delete
2.97 kB
"""
domain/exceptions/pipeline_exceptions.py
────────────────────────────────────────
Custom domain-specific exceptions for signal preprocessing, model inference,
and message broker operations.
Keeps clean architecture by remaining decoupled from infrastructure frameworks
(e.g. FastAPI, PyTorch, aio-pika).
Hierarchy (all extend DomainException):
PreprocessingError β€” signal preprocessing / ETL failure β†’ 422
ModelInferenceError β€” deep learning model failure β†’ 503
BrokerError β€” message broker publish/consume error β†’ 503
"""
from __future__ import annotations
from src.domain.exceptions.domain_exceptions import DomainException
class PreprocessingError(DomainException):
"""
Raised when an error occurs during signal preprocessing or ETL.
Examples:
- Signal duration is too short for segmenting.
- Resampling or filtering fails.
- Visibility Graph calculation fails or produces invalid results.
HTTP Status: 422 Unprocessable Entity
"""
def __init__(self, step: str, reason: str, context: dict | None = None) -> None:
message = f"Preprocessing failed at step '{step}': {reason}"
ctx = {"step": step, "reason": reason}
if context:
ctx.update(context)
super().__init__(message, context=ctx)
self.step = step
self.reason = reason
class ModelInferenceError(DomainException):
"""
Raised when a deep learning model inference fails.
Examples:
- Missing weight checkpoints on disk.
- Checkpoint layout does not match state_dict expectation.
- PyTorch model forward pass fails.
HTTP Status: 503 Service Unavailable
"""
def __init__(self, model_name: str, reason: str, context: dict | None = None) -> None:
message = f"Model '{model_name}' inference failed: {reason}"
ctx = {"model_name": model_name, "reason": reason}
if context:
ctx.update(context)
super().__init__(message, context=ctx)
self.model_name = model_name
self.reason = reason
class BrokerError(DomainException):
"""
Raised when the message broker (RabbitMQ) is unreachable or a
publish/consume operation fails.
Wraps aio-pika / AMQP low-level errors so that domain and application
layers never import broker-specific libraries directly.
HTTP Status: 503 Service Unavailable
Args:
operation: The broker operation that failed (e.g. ``"publish"``, ``"connect"``).
reason: Human-readable description of what went wrong.
"""
def __init__(self, operation: str, reason: str) -> None:
message = f"Broker error during '{operation}': {reason}"
super().__init__(message, context={"operation": operation, "reason": reason})
self.operation = operation
self.reason = reason