File size: 5,696 Bytes
e391a84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
interface/consumer/run_consumer.py
────────────────────────────────────
Standalone async consumer β€” Entry Point 2 (Google Colab / local).

This script:
    1. Wires all dependencies (same repos, broker, processor, model as the API).
    2. Connects to RabbitMQ.
    3. Runs ProcessAndPredictUseCase for every message consumed.

Usage:
    python -m src.interface.consumer.run_consumer

Colab usage:
    !python -m src.interface.consumer.run_consumer

The consumer runs indefinitely until interrupted (Ctrl+C or Colab cell stop).
"""
from __future__ import annotations

import asyncio
import signal
import sys
from typing import Any

from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase
from src.infrastructure.database.connection import create_all_tables, dispose_engine, get_session_factory
from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository
from src.infrastructure.database.repositories.prediction_repository import SQLAlchemyPredictionRepository
from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker
from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService
from src.infrastructure.model.mock_model_service import MockModelService
from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor
from src.shared.config import get_settings
from src.shared.constants import PPG_QUEUE_NAME
from src.shared.logger import get_logger

logger = get_logger(__name__)


async def run() -> None:
    """
    Bootstrap and run the message queue consumer.

    Dependency wiring mirrors the API's dependencies.py β€” the use case is
    identical, only the bootstrapping mechanism differs.
    """
    settings = get_settings()

    logger.info("=" * 60)
    logger.info("BP Monitoring Pipeline β€” Consumer starting up")
    logger.info("Database : %s", settings.database_url.split("@")[-1])
    logger.info("Broker   : %s", settings.rabbitmq_url.split("@")[-1])
    logger.info("Mock Model: %s", settings.use_mock_model)
    logger.info("=" * 60)

    # ── Create DB tables (dev/SQLite) ─────────────────────────────────────────
    if settings.debug or "sqlite" in settings.database_url:
        await create_all_tables()

    # ── Wire dependencies ─────────────────────────────────────────────────────
    broker = RabbitMQBroker(url=settings.rabbitmq_url)
    signal_processor = ScipySignalProcessor()
    model_service = MockModelService() if settings.use_mock_model else GANVGTLNetService()

    # Load model
    logger.info("Loading model (%s)…", model_service.model_version)
    await model_service.load_model()
    logger.info("Model ready.")

    # Connect broker
    await broker.connect()
    logger.info("Connected to RabbitMQ.")

    # ── Message handler ───────────────────────────────────────────────────────
    session_factory = get_session_factory()

    async def handle_message(payload: dict[str, Any]) -> None:
        """
        Invoked for each message dequeued from PPG_QUEUE_NAME.

        Creates a new DB session per message (request-scoped isolation).
        """
        signal_id = payload.get("id", "<unknown>")
        logger.info("Received message for signal_id=%s", signal_id)

        async with session_factory() as session:
            ppg_repo = SQLAlchemyPPGRepository(session)
            prediction_repo = SQLAlchemyPredictionRepository(session)

            use_case = ProcessAndPredictUseCase(
                ppg_repo=ppg_repo,
                prediction_repo=prediction_repo,
                signal_processor=signal_processor,
                model_service=model_service,
            )

            try:
                prediction = await use_case.execute(payload)
                await session.commit()
                logger.info(
                    "Prediction stored: id=%s SBP=%.1f DBP=%.1f",
                    prediction.id,
                    prediction.predicted_sbp,
                    prediction.predicted_dbp,
                )
            except Exception as exc:
                await session.rollback()
                logger.error(
                    "Failed to process signal_id=%s: %s", signal_id, exc, exc_info=True
                )
                raise  # re-raise so the broker nacks the message

    # ── Start consuming ───────────────────────────────────────────────────────
    logger.info("Listening on queue '%s'… (Ctrl+C to stop)", PPG_QUEUE_NAME)
    try:
        await broker.consume(queue_name=PPG_QUEUE_NAME, handler=handle_message)
    finally:
        await broker.disconnect()
        await dispose_engine()
        logger.info("Consumer shut down cleanly.")


def main() -> None:
    """CLI entry point."""
    loop = asyncio.get_event_loop()

    # Handle Ctrl+C gracefully
    def _handle_sigint(*_: Any) -> None:
        logger.info("Interrupt received β€” shutting down consumer…")
        for task in asyncio.all_tasks(loop):
            task.cancel()

    signal.signal(signal.SIGINT, _handle_sigint)

    try:
        loop.run_until_complete(run())
    except asyncio.CancelledError:
        pass
    finally:
        loop.close()


if __name__ == "__main__":
    main()