File size: 8,348 Bytes
e391a84 77f2d58 e391a84 77f2d58 e391a84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | """
interface/api/app.py
βββββββββββββββββββββ
FastAPI application factory.
Usage:
# Direct (dev)
uvicorn src.interface.api.app:create_app --factory --port 7860 --reload
# Docker / HF Spaces
CMD ["uvicorn", "src.interface.api.app:create_app", "--factory", ...]
Design decisions:
β’ Factory function (not module-level app) so tests can create isolated instances.
β’ Lifespan context manager handles startup/shutdown cleanly.
β’ Global exception handlers registered via error_handlers.register_exception_handlers().
β’ CORS is wide-open by default β restrict in production via settings.
"""
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Annotated, AsyncGenerator
from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.infrastructure.database.connection import (
create_all_tables,
dispose_engine,
ping_database,
)
from src.interface.api.dependencies import get_broker, get_model_service
from src.interface.api.error_handlers import register_exception_handlers
from src.interface.api.routes import ppg_routes, prediction_routes
from src.shared.config import get_settings
from src.shared.constants import API_V1_PREFIX
from src.shared.logger import get_logger
logger = get_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan handler.
Startup:
1. Validate database connectivity (ping).
2. Create DB tables (dev/SQLite only β Supabase uses Alembic).
3. Connect to RabbitMQ.
4. Pre-load AI model.
Shutdown:
1. Disconnect from RabbitMQ.
2. Dispose the DB engine (returns all pooled connections).
"""
settings = get_settings()
logger.info("=" * 60)
logger.info("BP Monitoring Pipeline β Starting up")
logger.info("Database : %s", settings.database_url.split("@")[-1] if "@" in settings.database_url else settings.database_url)
logger.info("Supabase : %s", "yes" if settings.is_supabase else "no")
logger.info("Pooler : %s", "yes (port 6543)" if settings.uses_pooler else "no")
logger.info("Broker : %s", settings.rabbitmq_url.split("@")[-1] if "@" in settings.rabbitmq_url else settings.rabbitmq_url)
logger.info("Mock Model: %s", settings.use_mock_model)
logger.info("=" * 60)
# ββ 1. Validate database connection βββββββββββββββββββββββββββββββββββββββ
db_ok = await ping_database()
if db_ok:
logger.info("β Database connection verified.")
else:
logger.warning(
"β Database ping failed β check DATABASE_URL. "
"The API will start but DB operations will fail."
)
# ββ 2. Create tables (dev/SQLite only) ββββββββββββββββββββββββββββββββββββ
if settings.debug or settings.is_sqlite:
logger.info("Auto-creating DB tables (dev/SQLite mode)β¦")
await create_all_tables()
# ββ 3. Connect broker (best-effort on startup) ββββββββββββββββββββββββββββ
broker_provider = app.dependency_overrides.get(get_broker, get_broker)
broker = broker_provider()
try:
await broker.connect()
logger.info("β RabbitMQ broker connected.")
except Exception as exc:
logger.warning(
"β Could not connect to RabbitMQ on startup (will retry on first publish): %s", exc
)
# ββ 4. Pre-load AI model (best-effort) ββββββββββββββββββββββββββββββββββββ
model_provider = app.dependency_overrides.get(get_model_service, get_model_service)
model_service = model_provider()
try:
await model_service.load_model()
logger.info("β Model service ready: %s", model_service.model_version)
except Exception as exc:
logger.warning("β Could not pre-load model on startup: %s", exc)
logger.info("Startup complete. API is ready to serve requests.")
yield
# ββ Shutdown ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logger.info("Shutting downβ¦")
try:
await broker.disconnect()
except Exception:
pass
await dispose_engine()
logger.info("Shutdown complete.")
def create_app() -> FastAPI:
"""
FastAPI application factory.
Returns a fully configured FastAPI instance with:
β’ Lifespan hooks (startup/shutdown)
β’ Global domain exception handlers
β’ CORS middleware
β’ API v1 routes
β’ Health check endpoint
β’ Swagger UI / ReDoc
"""
settings = get_settings()
app = FastAPI(
title="BP Monitoring Pipeline API",
description=(
"REST API for the **Blood Pressure Monitoring Pipeline**.\n\n"
"Receives PPG signals from IoT devices (ETL #1) and serves "
"AI-predicted blood pressure results to the frontend.\n\n"
"**Error Response Format** (all errors):\n"
"```json\n"
'{"error": "not_found", "message": "...", "context": {}, "timestamp": "..."}\n'
"```"
),
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan,
)
# ββ Global Exception Handlers βββββββββββββββββββββββββββββββββββββββββββββ
# Registered BEFORE routes so handlers are available for all endpoints.
register_exception_handlers(app)
# ββ CORS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ββ Routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
app.include_router(ppg_routes.router, prefix=API_V1_PREFIX)
app.include_router(prediction_routes.router, prefix=API_V1_PREFIX)
# ββ Health Check ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get(
"/health",
tags=["Health"],
summary="Health Check",
responses={
200: {"description": "Service is healthy."},
503: {"description": "One or more dependencies are unavailable."},
},
)
async def health(
broker = Depends(get_broker)
) -> dict:
"""
Returns service health status including database and broker connectivity.
Used by Docker HEALTHCHECK and HF Spaces monitoring.
"""
from src.domain.interfaces.services.message_broker import MessageBroker
db_healthy = await ping_database()
broker_connected = await broker.is_connected()
model_service = get_model_service()
all_healthy = db_healthy and broker_connected
return {
"status": "ok" if all_healthy else "degraded",
"database": "ok" if db_healthy else "unavailable",
"broker_connected": broker_connected,
"model_loaded": model_service.is_loaded(),
"model_version": model_service.model_version,
}
@app.get("/", tags=["Health"], include_in_schema=False)
async def root() -> dict:
return {
"service": "BP Monitoring Pipeline API",
"version": "1.0.0",
"docs": "/docs",
}
logger.info("FastAPI application created.")
return app
|