Spaces:
Configuration error
Configuration error
| """ | |
| Main FastAPI application for Analytics Microservice. | |
| """ | |
| import os | |
| import time | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, Request, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from fastapi.exceptions import RequestValidationError | |
| from jose import JWTError | |
| from pymongo.errors import PyMongoError | |
| from app.core.config import settings | |
| from app.core.logging import get_logger, setup_logging | |
| from app.nosql import connect_to_mongo, close_mongo_connection | |
| from app.cache import connect_to_redis, close_redis_connection | |
| from app.postgres import connect_to_postgres, close_postgres_connection | |
| from app.events.controllers.router import router as events_router | |
| from app.reports.controllers.router import router as reports_router | |
| from app.dashboard.controllers.router import router as dashboard_router | |
| from app.kpi_cache.controllers.router import router as kpi_cache_router | |
| from app.widget_collection.router import router as widget_collection_router | |
| # Init logging | |
| setup_logging(level=settings.LOG_LEVEL.strip().upper()) | |
| logger = get_logger(__name__) | |
| async def lifespan(app: FastAPI): | |
| logger.info("Starting Analytics Microservice", extra={"event": "service_starting"}) | |
| await connect_to_mongo() | |
| await connect_to_redis() | |
| await connect_to_postgres() | |
| # Ensure MongoDB indexes for kpi_cache | |
| from app.nosql import get_database | |
| from app.kpi_cache.constants import KPI_CACHE_COLLECTION | |
| db = get_database() | |
| kpi_col = db[KPI_CACHE_COLLECTION] | |
| await kpi_col.create_index( | |
| [("merchant_id", 1), ("widget_id", 1), ("period_window", 1), ("branch_id", 1)], | |
| unique=True, background=True, name="kpi_cache_natural_key" | |
| ) | |
| await kpi_col.create_index([("merchant_id", 1)], background=True, name="kpi_cache_merchant_id") | |
| await kpi_col.create_index([("expires_at", 1)], background=True, name="kpi_cache_expires_at") | |
| # Ensure MongoDB index for analytics_widget_collection | |
| wc_col = db["analytics_widget_collection"] | |
| await wc_col.create_index("widget_id", unique=True, background=True, name="widget_id_unique") | |
| logger.info("Analytics Microservice started", extra={"event": "service_ready"}) | |
| yield | |
| logger.info("Shutting down Analytics Microservice", extra={"event": "service_stopping"}) | |
| await close_postgres_connection() | |
| await close_mongo_connection() | |
| await close_redis_connection() | |
| logger.info("Analytics Microservice stopped", extra={"event": "service_stopped"}) | |
| app = FastAPI( | |
| title="Analytics Microservice", | |
| description="Centralised analytics, event ingestion, metrics aggregation and reporting.", | |
| version="1.0.0", | |
| docs_url="/docs", | |
| redoc_url="/redoc", | |
| root_path=os.getenv("ROOT_PATH", ""), | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=settings.CORS_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| expose_headers=["*"], | |
| ) | |
| async def log_requests(request: Request, call_next): | |
| start = time.time() | |
| response = await call_next(request) | |
| duration_ms = round((time.time() - start) * 1000, 2) | |
| logger.info( | |
| "HTTP request", | |
| extra={ | |
| "event": "http_request", | |
| "method": request.method, | |
| "path": request.url.path, | |
| "status_code": response.status_code, | |
| "duration_ms": duration_ms, | |
| "client_ip": request.client.host if request.client else None, | |
| }, | |
| ) | |
| return response | |
| async def validation_exception_handler(request: Request, exc: RequestValidationError): | |
| logger.warning("Validation error", extra={"event": "validation_error", "errors": exc.errors()}) | |
| return JSONResponse( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| content={"success": False, "detail": exc.errors()}, | |
| ) | |
| async def jwt_exception_handler(request: Request, exc: JWTError): | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content={"success": False, "detail": "Invalid or expired token"}, | |
| ) | |
| async def mongo_exception_handler(request: Request, exc: PyMongoError): | |
| logger.error("MongoDB error", extra={"event": "mongo_error", "error": str(exc)}, exc_info=True) | |
| return JSONResponse( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| content={"success": False, "detail": "Database error"}, | |
| ) | |
| async def generic_exception_handler(request: Request, exc: Exception): | |
| logger.error("Unhandled exception", extra={"event": "unhandled_exception", "error": str(exc)}, exc_info=True) | |
| return JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content={"success": False, "detail": "Internal server error"}, | |
| ) | |
| # Routers | |
| app.include_router(events_router) | |
| app.include_router(reports_router) | |
| app.include_router(dashboard_router) | |
| app.include_router(kpi_cache_router) | |
| app.include_router(widget_collection_router) | |
| async def health_check(): | |
| return {"status": "healthy", "service": "analytics-microservice", "version": app.version} | |
| async def list_routes(): | |
| return [ | |
| {"path": r.path, "methods": list(r.methods) if r.methods else [], "name": r.name} | |
| for r in app.routes | |
| if hasattr(r, "path") and hasattr(r, "methods") | |
| ] | |