| """ |
| tests/unit/test_use_cases.py |
| ββββββββββββββββββββββββββββββ |
| Unit tests for all three use cases. |
| |
| All external dependencies (repositories, broker, model, processor) are mocked |
| via unittest.mock β no real database, no RabbitMQ, no SciPy or PyTorch required. |
| """ |
| from __future__ import annotations |
|
|
| from datetime import datetime, timezone |
| from typing import Any |
| from unittest.mock import AsyncMock, MagicMock, patch |
|
|
| import numpy as np |
| import pytest |
|
|
| from src.application.dto.ppg_dto import PPGIngestRequest |
| from src.application.use_cases.get_prediction_history import GetPredictionHistoryUseCase |
| from src.application.use_cases.ingest_ppg import IngestPPGUseCase |
| from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase |
| from src.domain.entities.prediction import BPPrediction |
| from src.domain.entities.ppg_signal import PPGSignal |
| from src.domain.exceptions.domain_exceptions import EntityNotFoundError, InvalidSignalError |
|
|
|
|
| |
|
|
| def _valid_signal() -> PPGSignal: |
| return PPGSignal( |
| id="test-signal-uuid", |
| device_id="sensor-001", |
| user_id="user-123", |
| sampling_rate=125.0, |
| ppg_values=[0.1] * 1250, |
| duration_seconds=10.0, |
| ) |
|
|
|
|
| def _valid_prediction(signal_id: str = "test-signal-uuid") -> BPPrediction: |
| return BPPrediction( |
| id="test-pred-uuid", |
| ppg_signal_id=signal_id, |
| predicted_sbp=120.0, |
| predicted_dbp=80.0, |
| model_version="mock-v1.0", |
| inference_time_ms=50.0, |
| ) |
|
|
|
|
| def _valid_request() -> PPGIngestRequest: |
| return PPGIngestRequest( |
| device_id="sensor-001", |
| user_id="user-123", |
| sampling_rate=125.0, |
| ppg_values=[0.1] * 1250, |
| duration_seconds=10.0, |
| ) |
|
|
|
|
| |
|
|
| class TestIngestPPGUseCase: |
| @pytest.fixture |
| def ppg_repo(self) -> AsyncMock: |
| repo = AsyncMock() |
| repo.add = AsyncMock(return_value=_valid_signal()) |
| return repo |
|
|
| @pytest.fixture |
| def broker(self) -> AsyncMock: |
| b = AsyncMock() |
| b.publish = AsyncMock(return_value=None) |
| return b |
|
|
| @pytest.fixture |
| def use_case(self, ppg_repo: AsyncMock, broker: AsyncMock) -> IngestPPGUseCase: |
| return IngestPPGUseCase(ppg_repo=ppg_repo, broker=broker) |
|
|
| async def test_execute_success( |
| self, use_case: IngestPPGUseCase, ppg_repo: AsyncMock, broker: AsyncMock |
| ) -> None: |
| request = _valid_request() |
| response = await use_case.execute(request) |
|
|
| assert response.signal_id == "test-signal-uuid" |
| assert response.user_id == "user-123" |
| assert response.queued is True |
| ppg_repo.add.assert_called_once() |
| broker.publish.assert_called_once() |
|
|
| async def test_execute_broker_failure_still_returns_response( |
| self, use_case: IngestPPGUseCase, ppg_repo: AsyncMock, broker: AsyncMock |
| ) -> None: |
| """Broker publish failure is non-fatal β signal is already persisted.""" |
| broker.publish.side_effect = Exception("RabbitMQ down") |
| response = await use_case.execute(_valid_request()) |
| assert response.queued is False |
| ppg_repo.add.assert_called_once() |
|
|
| async def test_execute_invalid_signal_raises( |
| self, use_case: IngestPPGUseCase |
| ) -> None: |
| """ |
| Pydantic's min_length=1 on device_id prevents an empty string from reaching |
| the use case at the DTO level. We therefore test domain-level InvalidSignalError |
| by constructing the DTO validly but giving an impossible duration/sample combo. |
| """ |
| |
| |
| from pydantic import ValidationError |
|
|
| with pytest.raises(ValidationError): |
| PPGIngestRequest( |
| device_id="", |
| user_id="user-123", |
| sampling_rate=125.0, |
| ppg_values=[0.1] * 1250, |
| duration_seconds=10.0, |
| ) |
|
|
| |
| bad_request = PPGIngestRequest( |
| device_id="sensor-001", |
| user_id="user-123", |
| sampling_rate=125.0, |
| ppg_values=[0.1] * 50, |
| duration_seconds=10.0, |
| ) |
| with pytest.raises(InvalidSignalError): |
| await use_case.execute(bad_request) |
|
|
|
|
| |
|
|
| class TestProcessAndPredictUseCase: |
| @pytest.fixture |
| def ppg_repo(self) -> AsyncMock: |
| repo = AsyncMock() |
| repo.get_by_id = AsyncMock(return_value=_valid_signal()) |
| return repo |
|
|
| @pytest.fixture |
| def prediction_repo(self) -> AsyncMock: |
| repo = AsyncMock() |
| repo.add = AsyncMock(return_value=_valid_prediction()) |
| return repo |
|
|
| @pytest.fixture |
| def signal_processor(self) -> MagicMock: |
| processor = MagicMock() |
| processor.process = MagicMock(return_value=np.zeros((5, 1000))) |
| return processor |
|
|
| @pytest.fixture |
| def model_service(self) -> AsyncMock: |
| service = AsyncMock() |
| service.is_loaded = MagicMock(return_value=True) |
| service.predict = AsyncMock(return_value=_valid_prediction()) |
| return service |
|
|
| @pytest.fixture |
| def use_case( |
| self, |
| ppg_repo: AsyncMock, |
| prediction_repo: AsyncMock, |
| signal_processor: MagicMock, |
| model_service: AsyncMock, |
| ) -> ProcessAndPredictUseCase: |
| return ProcessAndPredictUseCase( |
| ppg_repo=ppg_repo, |
| prediction_repo=prediction_repo, |
| signal_processor=signal_processor, |
| model_service=model_service, |
| ) |
|
|
| async def test_execute_success( |
| self, |
| use_case: ProcessAndPredictUseCase, |
| ppg_repo: AsyncMock, |
| prediction_repo: AsyncMock, |
| signal_processor: MagicMock, |
| model_service: AsyncMock, |
| ) -> None: |
| message = {"id": "test-signal-uuid"} |
| result = await use_case.execute(message) |
|
|
| assert result.predicted_sbp == 120.0 |
| assert result.predicted_dbp == 80.0 |
| ppg_repo.get_by_id.assert_called_once_with("test-signal-uuid") |
| signal_processor.process.assert_called_once() |
| model_service.predict.assert_called_once() |
| prediction_repo.add.assert_called_once() |
|
|
| async def test_signal_not_found_raises( |
| self, use_case: ProcessAndPredictUseCase, ppg_repo: AsyncMock |
| ) -> None: |
| ppg_repo.get_by_id = AsyncMock(return_value=None) |
| with pytest.raises(EntityNotFoundError): |
| await use_case.execute({"id": "nonexistent-id"}) |
|
|
| async def test_loads_model_if_not_loaded( |
| self, |
| use_case: ProcessAndPredictUseCase, |
| model_service: AsyncMock, |
| ) -> None: |
| model_service.is_loaded = MagicMock(return_value=False) |
| await use_case.execute({"id": "test-signal-uuid"}) |
| model_service.load_model.assert_called_once() |
|
|
|
|
| |
|
|
| class TestGetPredictionHistoryUseCase: |
| @pytest.fixture |
| def prediction_repo(self) -> AsyncMock: |
| repo = AsyncMock() |
| repo.get_by_user_latest = AsyncMock( |
| return_value=[_valid_prediction(), _valid_prediction()] |
| ) |
| repo.get_by_date_range = AsyncMock(return_value=[_valid_prediction()]) |
| return repo |
|
|
| @pytest.fixture |
| def use_case(self, prediction_repo: AsyncMock) -> GetPredictionHistoryUseCase: |
| return GetPredictionHistoryUseCase(prediction_repo=prediction_repo) |
|
|
| async def test_latest_mode( |
| self, use_case: GetPredictionHistoryUseCase, prediction_repo: AsyncMock |
| ) -> None: |
| result = await use_case.execute("user-123", limit=10) |
| assert result.user_id == "user-123" |
| assert result.total == 2 |
| prediction_repo.get_by_user_latest.assert_called_once_with(user_id="user-123", limit=10) |
| prediction_repo.get_by_date_range.assert_not_called() |
|
|
| async def test_date_range_mode( |
| self, use_case: GetPredictionHistoryUseCase, prediction_repo: AsyncMock |
| ) -> None: |
| start = datetime(2026, 1, 1, tzinfo=timezone.utc) |
| end = datetime(2026, 12, 31, tzinfo=timezone.utc) |
| result = await use_case.execute("user-123", start=start, end=end) |
| assert result.total == 1 |
| prediction_repo.get_by_date_range.assert_called_once() |
| prediction_repo.get_by_user_latest.assert_not_called() |
|
|
| async def test_empty_history_returns_empty_list( |
| self, use_case: GetPredictionHistoryUseCase, prediction_repo: AsyncMock |
| ) -> None: |
| prediction_repo.get_by_user_latest = AsyncMock(return_value=[]) |
| result = await use_case.execute("user-456") |
| assert result.total == 0 |
| assert result.predictions == [] |
|
|