RyZ
fix: lazy import GANVGTLNetService and NumbaVGTLNetPreprocessor to avoid module-level torch import
6ace378 | """ | |
| interface/api/dependencies.py | |
| βββββββββββββββββββββββββββββββ | |
| FastAPI dependency injection wiring (Composition Root for the API). | |
| This is the ONLY place in the codebase that selects concrete implementations | |
| and wires them to use cases. Change infra providers here β nothing else changes. | |
| DI hierarchy: | |
| AsyncSession ββ PPGRepository, PredictionRepository | |
| Settings ββ MessageBroker (singleton), ModelService (singleton) | |
| Repositories + Broker ββ Use Cases | |
| """ | |
| from __future__ import annotations | |
| from functools import lru_cache | |
| from typing import Annotated | |
| from fastapi import Depends | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src.application.use_cases.get_prediction_history import GetPredictionHistoryUseCase | |
| from src.application.use_cases.ingest_ppg import IngestPPGUseCase | |
| from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase | |
| from src.domain.interfaces.repositories.ppg_repository import PPGRepository | |
| from src.domain.interfaces.repositories.prediction_repository import PredictionRepository | |
| from src.domain.interfaces.services.message_broker import MessageBroker | |
| from src.domain.interfaces.services.model_service import ModelService | |
| from src.domain.interfaces.services.signal_processor import SignalProcessor | |
| from src.infrastructure.database.connection import get_async_session | |
| from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository | |
| from src.infrastructure.database.repositories.prediction_repository import ( | |
| SQLAlchemyPredictionRepository, | |
| ) | |
| from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker | |
| from src.infrastructure.model.mock_model_service import MockModelService | |
| from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor | |
| from src.infrastructure.processing.scipy_cardiogan_preprocessor import SciPyCardioGANPreprocessor | |
| from src.shared.config import get_settings | |
| from src.shared.logger import get_logger | |
| logger = get_logger(__name__) | |
| # ββ Singleton Dependencies (created once per process) βββββββββββββββββββββββββ | |
| def get_broker() -> MessageBroker: | |
| """ | |
| Return the singleton MessageBroker instance. | |
| Uses RabbitMQBroker in all cases β swapping to Kafka/Redis means | |
| replacing this one line. The broker is connected lazily on first use. | |
| """ | |
| settings = get_settings() | |
| logger.info("Creating RabbitMQBroker singleton") | |
| return RabbitMQBroker(url=settings.rabbitmq_url) | |
| def get_signal_processor() -> SignalProcessor: | |
| """Return the singleton SignalProcessor instance (ScipySignalProcessor).""" | |
| return ScipySignalProcessor() | |
| def get_model_service() -> ModelService: | |
| """ | |
| Return the singleton ModelService instance. | |
| Selects MockModelService or GANVGTLNetService based on USE_MOCK_MODEL setting. | |
| """ | |
| settings = get_settings() | |
| if settings.use_mock_model: | |
| logger.info("ModelService: using MockModelService (USE_MOCK_MODEL=true)") | |
| return MockModelService() | |
| else: | |
| logger.info("ModelService: using GANVGTLNetService (USE_MOCK_MODEL=false)") | |
| from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService | |
| from src.infrastructure.processing.numba_vgtlnet_preprocessor import NumbaVGTLNetPreprocessor | |
| gan_prep = SciPyCardioGANPreprocessor() | |
| vgtl_prep = NumbaVGTLNetPreprocessor() | |
| return GANVGTLNetService(gan_preprocessor=gan_prep, vgtlnet_preprocessor=vgtl_prep) | |
| # ββ Per-Request Dependencies ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_ppg_repository( | |
| session: Annotated[AsyncSession, Depends(get_async_session)], | |
| ) -> PPGRepository: | |
| """Yield a PPGRepository scoped to the current request's DB session.""" | |
| return SQLAlchemyPPGRepository(session) | |
| async def get_prediction_repository( | |
| session: Annotated[AsyncSession, Depends(get_async_session)], | |
| ) -> PredictionRepository: | |
| """Yield a PredictionRepository scoped to the current request's DB session.""" | |
| return SQLAlchemyPredictionRepository(session) | |
| # ββ Use Case Dependencies βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_ingest_ppg_use_case( | |
| ppg_repo: Annotated[PPGRepository, Depends(get_ppg_repository)], | |
| broker: Annotated[MessageBroker, Depends(get_broker)], | |
| ) -> IngestPPGUseCase: | |
| """Return an IngestPPGUseCase with request-scoped repository and singleton broker.""" | |
| return IngestPPGUseCase(ppg_repo=ppg_repo, broker=broker) | |
| async def get_process_and_predict_use_case( | |
| ppg_repo: Annotated[PPGRepository, Depends(get_ppg_repository)], | |
| prediction_repo: Annotated[PredictionRepository, Depends(get_prediction_repository)], | |
| signal_processor: Annotated[SignalProcessor, Depends(get_signal_processor)], | |
| model_service: Annotated[ModelService, Depends(get_model_service)], | |
| ) -> ProcessAndPredictUseCase: | |
| """Return a ProcessAndPredictUseCase with all required dependencies.""" | |
| return ProcessAndPredictUseCase( | |
| ppg_repo=ppg_repo, | |
| prediction_repo=prediction_repo, | |
| signal_processor=signal_processor, | |
| model_service=model_service, | |
| ) | |
| async def get_prediction_history_use_case( | |
| prediction_repo: Annotated[PredictionRepository, Depends(get_prediction_repository)], | |
| ) -> GetPredictionHistoryUseCase: | |
| """Return a GetPredictionHistoryUseCase.""" | |
| return GetPredictionHistoryUseCase(prediction_repo=prediction_repo) | |