File size: 5,696 Bytes
e391a84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | """
interface/consumer/run_consumer.py
ββββββββββββββββββββββββββββββββββββ
Standalone async consumer β Entry Point 2 (Google Colab / local).
This script:
1. Wires all dependencies (same repos, broker, processor, model as the API).
2. Connects to RabbitMQ.
3. Runs ProcessAndPredictUseCase for every message consumed.
Usage:
python -m src.interface.consumer.run_consumer
Colab usage:
!python -m src.interface.consumer.run_consumer
The consumer runs indefinitely until interrupted (Ctrl+C or Colab cell stop).
"""
from __future__ import annotations
import asyncio
import signal
import sys
from typing import Any
from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase
from src.infrastructure.database.connection import create_all_tables, dispose_engine, get_session_factory
from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository
from src.infrastructure.database.repositories.prediction_repository import SQLAlchemyPredictionRepository
from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker
from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService
from src.infrastructure.model.mock_model_service import MockModelService
from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor
from src.shared.config import get_settings
from src.shared.constants import PPG_QUEUE_NAME
from src.shared.logger import get_logger
logger = get_logger(__name__)
async def run() -> None:
"""
Bootstrap and run the message queue consumer.
Dependency wiring mirrors the API's dependencies.py β the use case is
identical, only the bootstrapping mechanism differs.
"""
settings = get_settings()
logger.info("=" * 60)
logger.info("BP Monitoring Pipeline β Consumer starting up")
logger.info("Database : %s", settings.database_url.split("@")[-1])
logger.info("Broker : %s", settings.rabbitmq_url.split("@")[-1])
logger.info("Mock Model: %s", settings.use_mock_model)
logger.info("=" * 60)
# ββ Create DB tables (dev/SQLite) βββββββββββββββββββββββββββββββββββββββββ
if settings.debug or "sqlite" in settings.database_url:
await create_all_tables()
# ββ Wire dependencies βββββββββββββββββββββββββββββββββββββββββββββββββββββ
broker = RabbitMQBroker(url=settings.rabbitmq_url)
signal_processor = ScipySignalProcessor()
model_service = MockModelService() if settings.use_mock_model else GANVGTLNetService()
# Load model
logger.info("Loading model (%s)β¦", model_service.model_version)
await model_service.load_model()
logger.info("Model ready.")
# Connect broker
await broker.connect()
logger.info("Connected to RabbitMQ.")
# ββ Message handler βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
session_factory = get_session_factory()
async def handle_message(payload: dict[str, Any]) -> None:
"""
Invoked for each message dequeued from PPG_QUEUE_NAME.
Creates a new DB session per message (request-scoped isolation).
"""
signal_id = payload.get("id", "<unknown>")
logger.info("Received message for signal_id=%s", signal_id)
async with session_factory() as session:
ppg_repo = SQLAlchemyPPGRepository(session)
prediction_repo = SQLAlchemyPredictionRepository(session)
use_case = ProcessAndPredictUseCase(
ppg_repo=ppg_repo,
prediction_repo=prediction_repo,
signal_processor=signal_processor,
model_service=model_service,
)
try:
prediction = await use_case.execute(payload)
await session.commit()
logger.info(
"Prediction stored: id=%s SBP=%.1f DBP=%.1f",
prediction.id,
prediction.predicted_sbp,
prediction.predicted_dbp,
)
except Exception as exc:
await session.rollback()
logger.error(
"Failed to process signal_id=%s: %s", signal_id, exc, exc_info=True
)
raise # re-raise so the broker nacks the message
# ββ Start consuming βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logger.info("Listening on queue '%s'β¦ (Ctrl+C to stop)", PPG_QUEUE_NAME)
try:
await broker.consume(queue_name=PPG_QUEUE_NAME, handler=handle_message)
finally:
await broker.disconnect()
await dispose_engine()
logger.info("Consumer shut down cleanly.")
def main() -> None:
"""CLI entry point."""
loop = asyncio.get_event_loop()
# Handle Ctrl+C gracefully
def _handle_sigint(*_: Any) -> None:
logger.info("Interrupt received β shutting down consumerβ¦")
for task in asyncio.all_tasks(loop):
task.cancel()
signal.signal(signal.SIGINT, _handle_sigint)
try:
loop.run_until_complete(run())
except asyncio.CancelledError:
pass
finally:
loop.close()
if __name__ == "__main__":
main()
|