MukeshKapoor25's picture
Initial commit
b143975
"""
KPI Cache router.
POST /kpi-cache/stats — bulk KPI fetch
POST /kpi-cache/stats/individual/{kpi_id} — single KPI (404 on unknown, 403 on no widget access)
POST /kpi-cache/rebuild — force recompute
POST /kpi-cache/list — list cache snapshots (projection supported)
Auth rules (mirrors SCM-ms widget_router.py):
- merchant_id is ALWAYS sourced from JWT — never from request body
- Bulk stats / rebuild / list: require permissions.dashboard.view
- Individual KPI: additionally checks scm_access_roles.widget_access[] for that widget_id
"""
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status
from app.core.logging import get_logger
from app.dependencies.auth import get_current_user, TokenUser
from app.dependencies.kpi_permissions import require_dashboard_view, check_widget_access
from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY
from app.kpi_cache.schemas.schema import (
KPIStatsRequest,
KPIStatsResponse,
KPIIndividualRequest,
KPIRebuildRequest,
RebuildResponse,
KPICacheListRequest,
)
from app.kpi_cache.services.service import KPICacheService
logger = get_logger(__name__)
router = APIRouter(prefix="/kpi-cache", tags=["KPI Cache"])
@router.post("/stats", response_model=KPIStatsResponse)
async def get_kpi_stats(
payload: KPIStatsRequest,
current_user: TokenUser = Depends(require_dashboard_view),
):
"""
Return KPI widget values for the authenticated merchant.
merchant_id is taken from JWT — not from the request body.
Requires: permissions.dashboard.view
Partial failures return {"error": ..., "value": null} per KPI — never HTTP 500.
"""
branch_id = payload.branch_id or "all"
try:
kpis = await KPICacheService.get_kpi_stats(
merchant_id=current_user.merchant_id,
period_window=payload.period_window,
branch_id=branch_id,
kpi_ids=payload.kpi_ids,
use_cache=payload.use_cache,
)
logger.info(
"KPI stats retrieved",
extra={
"event": "kpi_stats_retrieved",
"user_id": current_user.user_id,
"merchant_id": current_user.merchant_id,
"period_window": payload.period_window,
"kpi_count": len(kpis),
},
)
return KPIStatsResponse(
success=True,
merchant_id=current_user.merchant_id,
period_window=payload.period_window,
branch_id=branch_id,
kpis=kpis,
generated_at=datetime.now(timezone.utc).isoformat(),
)
except Exception as exc:
logger.error("get_kpi_stats failed",
extra={"event": "kpi_stats_error", "error": str(exc)}, exc_info=True)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch KPI stats")
@router.post("/stats/individual/{kpi_id}")
async def get_individual_kpi(
kpi_id: str,
payload: KPIIndividualRequest,
current_user: TokenUser = Depends(require_dashboard_view),
):
"""
Refresh a single KPI widget.
Requires: permissions.dashboard.view + widget_id in scm_access_roles.widget_access[]
Returns HTTP 404 with available_kpis list when kpi_id is unknown.
use_cache=false bypasses cache read but writes fresh result back to Redis.
"""
# 404 on unknown widget_id (exact match, not fuzzy)
if kpi_id not in KPI_HANDLER_REGISTRY:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"error": "KPI not found",
"available_kpis": list(KPI_HANDLER_REGISTRY.keys()),
},
)
# Widget-level access control
await check_widget_access(kpi_id, current_user)
branch_id = payload.branch_id or "all"
result = await KPICacheService.get_individual_kpi(
widget_id=kpi_id,
merchant_id=current_user.merchant_id,
period_window=payload.period_window,
branch_id=branch_id,
use_cache=payload.use_cache,
)
logger.info(
"Individual KPI retrieved",
extra={
"event": "kpi_individual_retrieved",
"kpi_id": kpi_id,
"user_id": current_user.user_id,
"merchant_id": current_user.merchant_id,
},
)
return {"success": True, "merchant_id": current_user.merchant_id, "kpi": result}
@router.post("/rebuild", response_model=RebuildResponse)
async def rebuild_kpi_cache(
payload: KPIRebuildRequest,
current_user: TokenUser = Depends(require_dashboard_view),
):
"""
Force recompute KPI cache for the authenticated merchant.
Requires: permissions.dashboard.view
"""
try:
result = await KPICacheService.rebuild(
merchant_id=current_user.merchant_id,
period_window=payload.period_window,
branch_id=payload.branch_id,
kpi_ids=payload.kpi_ids,
)
return RebuildResponse(
success=True,
message="KPI cache rebuilt successfully",
merchant_id=current_user.merchant_id,
rebuilt_count=result["rebuilt"],
failed_count=result["failed"],
details=result.get("errors") or None,
)
except Exception as exc:
logger.error("rebuild_kpi_cache failed",
extra={"event": "kpi_rebuild_error", "error": str(exc)}, exc_info=True)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to rebuild KPI cache")
@router.post("/list")
async def list_kpi_cache(
payload: KPICacheListRequest,
current_user: TokenUser = Depends(require_dashboard_view),
):
"""
List KPI cache snapshots with optional projection_list.
merchant_id filter is always enforced from JWT — cannot be overridden.
Requires: permissions.dashboard.view
"""
filters = payload.filters.model_dump() if payload.filters else {}
# Always scope to the authenticated merchant — ignore any merchant_id in filters
filters["merchant_id"] = current_user.merchant_id
try:
docs = await KPICacheService.list_cache(
filters=filters,
skip=payload.skip,
limit=payload.limit,
projection_list=payload.projection_list,
)
return {"success": True, "data": docs, "count": len(docs)}
except Exception as exc:
logger.error("list_kpi_cache failed",
extra={"event": "kpi_list_error", "error": str(exc)}, exc_info=True)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to list KPI cache")