""" KPI Cache Service — registry-based asyncio.gather() dispatch. Reads from Redis L1 (via KPIHandler.compute) -> MongoDB L2 write-back. """ import asyncio import uuid from datetime import datetime, timezone, timedelta from typing import Optional, List, Dict, Any from app.core.logging import get_logger from app.nosql import get_database from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY from app.kpi_handlers.base_handler import KPIResult from app.kpi_cache.constants import ( KPI_CACHE_COLLECTION, KPI_WIDGET_REGISTRY, WIDGET_REDIS_TTL, ) from app.kpi_cache.schemas.schema import KPIResultSchema logger = get_logger(__name__) _DEFAULT_TTL = 600 def _ttl_for(widget_id: str) -> int: meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) return WIDGET_REDIS_TTL.get(meta.get("category", ""), _DEFAULT_TTL) class KPICacheService: @staticmethod async def get_kpi_stats( merchant_id: str, period_window: str, branch_id: str, kpi_ids: Optional[List[str]], use_cache: bool, ) -> Dict[str, Any]: """ Concurrently compute all requested KPIs via asyncio.gather(). Failing KPIs return {"error": "...", "value": null} — never HTTP 500. """ target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys()) async def compute_one(wid: str): handler = KPI_HANDLER_REGISTRY.get(wid) if not handler: return wid, {"error": f"KPI not found: {wid}", "value": None} try: br = branch_id if branch_id != "all" else None result: KPIResult = await handler.compute( merchant_id=merchant_id, period_window=period_window, branch_id=br, use_cache=use_cache, ) await KPICacheService._write_to_mongo( merchant_id, wid, period_window, branch_id, result ) return wid, KPICacheService._to_schema(wid, result) except Exception as exc: logger.error( "KPI computation failed", extra={"event": "kpi_compute_error", "widget_id": wid, "merchant_id": merchant_id, "error": str(exc)}, exc_info=True, ) return wid, {"error": str(exc), "value": None} pairs = await asyncio.gather(*[compute_one(wid) for wid in target_ids]) return dict(pairs) @staticmethod async def get_individual_kpi( widget_id: str, merchant_id: str, period_window: str, branch_id: str, use_cache: bool, ) -> Optional[KPIResultSchema]: """ Compute a single KPI. Returns None when widget_id is unknown (caller raises 404). use_cache=False bypasses cache read but writes fresh result back to Redis. """ handler = KPI_HANDLER_REGISTRY.get(widget_id) if not handler: return None br = branch_id if branch_id != "all" else None result: KPIResult = await handler.compute( merchant_id=merchant_id, period_window=period_window, branch_id=br, use_cache=use_cache, ) await KPICacheService._write_to_mongo( merchant_id, widget_id, period_window, branch_id, result ) return KPICacheService._to_schema(widget_id, result) @staticmethod async def rebuild( merchant_id: str, period_window: Optional[str], branch_id: Optional[str], kpi_ids: Optional[List[str]], ) -> Dict[str, Any]: """Force recompute for all/subset of KPIs and update both caches.""" target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys()) periods = [period_window] if period_window else ["today", "last_7_days", "mtd", "ytd"] branches = [branch_id] if branch_id else ["all"] rebuilt, failed = 0, 0 errors: Dict[str, str] = {} for pw in periods: for br in branches: results = await KPICacheService.get_kpi_stats( merchant_id=merchant_id, period_window=pw, branch_id=br, kpi_ids=target_ids, use_cache=False, ) for wid, payload in results.items(): if isinstance(payload, dict) and payload.get("error"): failed += 1 errors[f"{wid}:{pw}:{br}"] = payload["error"] else: rebuilt += 1 return {"rebuilt": rebuilt, "failed": failed, "errors": errors} @staticmethod async def list_cache( filters: Dict[str, Any], skip: int, limit: int, projection_list: Optional[List[str]], ) -> List[Any]: """List KPI cache documents with optional MongoDB projection.""" db = get_database() collection = db[KPI_CACHE_COLLECTION] query: Dict[str, Any] = {} for field in ("merchant_id", "widget_id", "period_window", "branch_id"): if filters.get(field): query[field] = filters[field] if filters.get("computed_at_from") or filters.get("computed_at_to"): date_filter: Dict[str, Any] = {} if filters.get("computed_at_from"): date_filter["$gte"] = filters["computed_at_from"] if filters.get("computed_at_to"): date_filter["$lte"] = filters["computed_at_to"] query["computed_at"] = date_filter projection_dict = None if projection_list: projection_dict = {f: 1 for f in projection_list} projection_dict["_id"] = 0 cursor = collection.find(query, projection_dict).skip(skip).limit(limit) return await cursor.to_list(length=limit) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _to_schema(widget_id: str, result: KPIResult) -> KPIResultSchema: meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) return KPIResultSchema( widget_id=widget_id, title=meta.get("title", widget_id), category=meta.get("category", ""), value=max(result.value, 0), unit=result.unit, delta=result.delta, delta_percentage=result.delta_percentage, trend=result.trend, secondary_values=result.secondary_values, drill_down_url=result.drill_down_url or meta.get("drill_down_url"), cached=result.cached, computed_at=result.computed_at, error=result.error, ) @staticmethod async def _write_to_mongo( merchant_id: str, widget_id: str, period_window: str, branch_id: str, result: KPIResult, ) -> None: try: db = get_database() collection = db[KPI_CACHE_COLLECTION] now = datetime.now(timezone.utc) ttl_seconds = _ttl_for(widget_id) meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) doc = { "cache_id": str(uuid.uuid4()), "merchant_id": merchant_id, "widget_id": widget_id, "period_window": period_window, "branch_id": branch_id, "value": max(result.value, 0), "unit": result.unit, "delta": result.delta, "delta_percentage": result.delta_percentage, "trend": result.trend, "secondary_values": result.secondary_values, "drill_down_url": result.drill_down_url or meta.get("drill_down_url"), "computed_at": now, "expires_at": now + timedelta(seconds=ttl_seconds), "cached": False, "error": result.error, } await collection.update_one( {"merchant_id": merchant_id, "widget_id": widget_id, "period_window": period_window, "branch_id": branch_id}, {"$set": doc}, upsert=True, ) except Exception as exc: logger.error( "MongoDB write failed", extra={"event": "mongo_write_error", "error": str(exc)}, )