LIBRE / tests /unit /test_use_cases.py
RyZ
feat: adding full working local ETL Pipeline
e391a84
Raw
History Blame Contribute Delete
9.67 kB
"""
tests/unit/test_use_cases.py
──────────────────────────────
Unit tests for all three use cases.
All external dependencies (repositories, broker, model, processor) are mocked
via unittest.mock β€” no real database, no RabbitMQ, no SciPy or PyTorch required.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import numpy as np
import pytest
from src.application.dto.ppg_dto import PPGIngestRequest
from src.application.use_cases.get_prediction_history import GetPredictionHistoryUseCase
from src.application.use_cases.ingest_ppg import IngestPPGUseCase
from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase
from src.domain.entities.prediction import BPPrediction
from src.domain.entities.ppg_signal import PPGSignal
from src.domain.exceptions.domain_exceptions import EntityNotFoundError, InvalidSignalError
# ── Shared helpers ────────────────────────────────────────────────────────────
def _valid_signal() -> PPGSignal:
return PPGSignal(
id="test-signal-uuid",
device_id="sensor-001",
user_id="user-123",
sampling_rate=125.0,
ppg_values=[0.1] * 1250,
duration_seconds=10.0,
)
def _valid_prediction(signal_id: str = "test-signal-uuid") -> BPPrediction:
return BPPrediction(
id="test-pred-uuid",
ppg_signal_id=signal_id,
predicted_sbp=120.0,
predicted_dbp=80.0,
model_version="mock-v1.0",
inference_time_ms=50.0,
)
def _valid_request() -> PPGIngestRequest:
return PPGIngestRequest(
device_id="sensor-001",
user_id="user-123",
sampling_rate=125.0,
ppg_values=[0.1] * 1250,
duration_seconds=10.0,
)
# ── IngestPPGUseCase Tests ────────────────────────────────────────────────────
class TestIngestPPGUseCase:
@pytest.fixture
def ppg_repo(self) -> AsyncMock:
repo = AsyncMock()
repo.add = AsyncMock(return_value=_valid_signal())
return repo
@pytest.fixture
def broker(self) -> AsyncMock:
b = AsyncMock()
b.publish = AsyncMock(return_value=None)
return b
@pytest.fixture
def use_case(self, ppg_repo: AsyncMock, broker: AsyncMock) -> IngestPPGUseCase:
return IngestPPGUseCase(ppg_repo=ppg_repo, broker=broker)
async def test_execute_success(
self, use_case: IngestPPGUseCase, ppg_repo: AsyncMock, broker: AsyncMock
) -> None:
request = _valid_request()
response = await use_case.execute(request)
assert response.signal_id == "test-signal-uuid"
assert response.user_id == "user-123"
assert response.queued is True
ppg_repo.add.assert_called_once()
broker.publish.assert_called_once()
async def test_execute_broker_failure_still_returns_response(
self, use_case: IngestPPGUseCase, ppg_repo: AsyncMock, broker: AsyncMock
) -> None:
"""Broker publish failure is non-fatal β€” signal is already persisted."""
broker.publish.side_effect = Exception("RabbitMQ down")
response = await use_case.execute(_valid_request())
assert response.queued is False # publish failed but signal was stored
ppg_repo.add.assert_called_once()
async def test_execute_invalid_signal_raises(
self, use_case: IngestPPGUseCase
) -> None:
"""
Pydantic's min_length=1 on device_id prevents an empty string from reaching
the use case at the DTO level. We therefore test domain-level InvalidSignalError
by constructing the DTO validly but giving an impossible duration/sample combo.
"""
# Build a DTO that passes Pydantic but fails domain validation:
# 1000 samples declared as 10 seconds at 125Hz β†’ expected 1250, far out of 10% tolerance
from pydantic import ValidationError
with pytest.raises(ValidationError):
PPGIngestRequest(
device_id="", # Pydantic rejects this at DTO boundary βœ…
user_id="user-123",
sampling_rate=125.0,
ppg_values=[0.1] * 1250,
duration_seconds=10.0,
)
# Domain-level: valid DTO, but sample count inconsistent β†’ InvalidSignalError
bad_request = PPGIngestRequest(
device_id="sensor-001",
user_id="user-123",
sampling_rate=125.0,
ppg_values=[0.1] * 50, # too few samples for 10s at 125Hz
duration_seconds=10.0,
)
with pytest.raises(InvalidSignalError):
await use_case.execute(bad_request)
# ── ProcessAndPredictUseCase Tests ────────────────────────────────────────────
class TestProcessAndPredictUseCase:
@pytest.fixture
def ppg_repo(self) -> AsyncMock:
repo = AsyncMock()
repo.get_by_id = AsyncMock(return_value=_valid_signal())
return repo
@pytest.fixture
def prediction_repo(self) -> AsyncMock:
repo = AsyncMock()
repo.add = AsyncMock(return_value=_valid_prediction())
return repo
@pytest.fixture
def signal_processor(self) -> MagicMock:
processor = MagicMock()
processor.process = MagicMock(return_value=np.zeros((5, 1000)))
return processor
@pytest.fixture
def model_service(self) -> AsyncMock:
service = AsyncMock()
service.is_loaded = MagicMock(return_value=True)
service.predict = AsyncMock(return_value=_valid_prediction())
return service
@pytest.fixture
def use_case(
self,
ppg_repo: AsyncMock,
prediction_repo: AsyncMock,
signal_processor: MagicMock,
model_service: AsyncMock,
) -> ProcessAndPredictUseCase:
return ProcessAndPredictUseCase(
ppg_repo=ppg_repo,
prediction_repo=prediction_repo,
signal_processor=signal_processor,
model_service=model_service,
)
async def test_execute_success(
self,
use_case: ProcessAndPredictUseCase,
ppg_repo: AsyncMock,
prediction_repo: AsyncMock,
signal_processor: MagicMock,
model_service: AsyncMock,
) -> None:
message = {"id": "test-signal-uuid"}
result = await use_case.execute(message)
assert result.predicted_sbp == 120.0
assert result.predicted_dbp == 80.0
ppg_repo.get_by_id.assert_called_once_with("test-signal-uuid")
signal_processor.process.assert_called_once()
model_service.predict.assert_called_once()
prediction_repo.add.assert_called_once()
async def test_signal_not_found_raises(
self, use_case: ProcessAndPredictUseCase, ppg_repo: AsyncMock
) -> None:
ppg_repo.get_by_id = AsyncMock(return_value=None)
with pytest.raises(EntityNotFoundError):
await use_case.execute({"id": "nonexistent-id"})
async def test_loads_model_if_not_loaded(
self,
use_case: ProcessAndPredictUseCase,
model_service: AsyncMock,
) -> None:
model_service.is_loaded = MagicMock(return_value=False)
await use_case.execute({"id": "test-signal-uuid"})
model_service.load_model.assert_called_once()
# ── GetPredictionHistoryUseCase Tests ─────────────────────────────────────────
class TestGetPredictionHistoryUseCase:
@pytest.fixture
def prediction_repo(self) -> AsyncMock:
repo = AsyncMock()
repo.get_by_user_latest = AsyncMock(
return_value=[_valid_prediction(), _valid_prediction()]
)
repo.get_by_date_range = AsyncMock(return_value=[_valid_prediction()])
return repo
@pytest.fixture
def use_case(self, prediction_repo: AsyncMock) -> GetPredictionHistoryUseCase:
return GetPredictionHistoryUseCase(prediction_repo=prediction_repo)
async def test_latest_mode(
self, use_case: GetPredictionHistoryUseCase, prediction_repo: AsyncMock
) -> None:
result = await use_case.execute("user-123", limit=10)
assert result.user_id == "user-123"
assert result.total == 2
prediction_repo.get_by_user_latest.assert_called_once_with(user_id="user-123", limit=10)
prediction_repo.get_by_date_range.assert_not_called()
async def test_date_range_mode(
self, use_case: GetPredictionHistoryUseCase, prediction_repo: AsyncMock
) -> None:
start = datetime(2026, 1, 1, tzinfo=timezone.utc)
end = datetime(2026, 12, 31, tzinfo=timezone.utc)
result = await use_case.execute("user-123", start=start, end=end)
assert result.total == 1
prediction_repo.get_by_date_range.assert_called_once()
prediction_repo.get_by_user_latest.assert_not_called()
async def test_empty_history_returns_empty_list(
self, use_case: GetPredictionHistoryUseCase, prediction_repo: AsyncMock
) -> None:
prediction_repo.get_by_user_latest = AsyncMock(return_value=[])
result = await use_case.execute("user-456")
assert result.total == 0
assert result.predictions == []