MukeshKapoor25's picture
widget lib changes
88dcd96
"""
Main FastAPI application for Analytics Microservice.
"""
import os
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from jose import JWTError
from pymongo.errors import PyMongoError
from app.core.config import settings
from app.core.logging import get_logger, setup_logging
from app.nosql import connect_to_mongo, close_mongo_connection
from app.cache import connect_to_redis, close_redis_connection
from app.postgres import connect_to_postgres, close_postgres_connection
from app.events.controllers.router import router as events_router
from app.reports.controllers.router import router as reports_router
from app.dashboard.controllers.router import router as dashboard_router
from app.kpi_cache.controllers.router import router as kpi_cache_router
from app.widget_collection.router import router as widget_collection_router
# Init logging
setup_logging(level=settings.LOG_LEVEL.strip().upper())
logger = get_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting Analytics Microservice", extra={"event": "service_starting"})
await connect_to_mongo()
await connect_to_redis()
await connect_to_postgres()
# Ensure MongoDB indexes for kpi_cache
from app.nosql import get_database
from app.kpi_cache.constants import KPI_CACHE_COLLECTION
db = get_database()
kpi_col = db[KPI_CACHE_COLLECTION]
await kpi_col.create_index(
[("merchant_id", 1), ("widget_id", 1), ("period_window", 1), ("branch_id", 1)],
unique=True, background=True, name="kpi_cache_natural_key"
)
await kpi_col.create_index([("merchant_id", 1)], background=True, name="kpi_cache_merchant_id")
await kpi_col.create_index([("expires_at", 1)], background=True, name="kpi_cache_expires_at")
# Ensure MongoDB index for analytics_widget_collection
wc_col = db["analytics_widget_collection"]
await wc_col.create_index("widget_id", unique=True, background=True, name="widget_id_unique")
logger.info("Analytics Microservice started", extra={"event": "service_ready"})
yield
logger.info("Shutting down Analytics Microservice", extra={"event": "service_stopping"})
await close_postgres_connection()
await close_mongo_connection()
await close_redis_connection()
logger.info("Analytics Microservice stopped", extra={"event": "service_stopped"})
app = FastAPI(
title="Analytics Microservice",
description="Centralised analytics, event ingestion, metrics aggregation and reporting.",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
root_path=os.getenv("ROOT_PATH", ""),
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.time()
response = await call_next(request)
duration_ms = round((time.time() - start) * 1000, 2)
logger.info(
"HTTP request",
extra={
"event": "http_request",
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": duration_ms,
"client_ip": request.client.host if request.client else None,
},
)
return response
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
logger.warning("Validation error", extra={"event": "validation_error", "errors": exc.errors()})
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"success": False, "detail": exc.errors()},
)
@app.exception_handler(JWTError)
async def jwt_exception_handler(request: Request, exc: JWTError):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"success": False, "detail": "Invalid or expired token"},
)
@app.exception_handler(PyMongoError)
async def mongo_exception_handler(request: Request, exc: PyMongoError):
logger.error("MongoDB error", extra={"event": "mongo_error", "error": str(exc)}, exc_info=True)
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={"success": False, "detail": "Database error"},
)
@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
logger.error("Unhandled exception", extra={"event": "unhandled_exception", "error": str(exc)}, exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"success": False, "detail": "Internal server error"},
)
# Routers
app.include_router(events_router)
app.include_router(reports_router)
app.include_router(dashboard_router)
app.include_router(kpi_cache_router)
app.include_router(widget_collection_router)
@app.get("/health", tags=["health"])
async def health_check():
return {"status": "healthy", "service": "analytics-microservice", "version": app.version}
@app.get("/debug/routes", tags=["debug"])
async def list_routes():
return [
{"path": r.path, "methods": list(r.methods) if r.methods else [], "name": r.name}
for r in app.routes
if hasattr(r, "path") and hasattr(r, "methods")
]