Spaces:
Configuration error
Configuration error
| """ | |
| KPI Cache Service — registry-based asyncio.gather() dispatch. | |
| Reads from Redis L1 (via KPIHandler.compute) -> MongoDB L2 write-back. | |
| """ | |
| import asyncio | |
| import uuid | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Optional, List, Dict, Any | |
| from app.core.logging import get_logger | |
| from app.nosql import get_database | |
| from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY | |
| from app.kpi_handlers.base_handler import KPIResult | |
| from app.kpi_cache.constants import ( | |
| KPI_CACHE_COLLECTION, | |
| KPI_WIDGET_REGISTRY, | |
| WIDGET_REDIS_TTL, | |
| ) | |
| from app.kpi_cache.schemas.schema import KPIResultSchema | |
| logger = get_logger(__name__) | |
| _DEFAULT_TTL = 600 | |
| def _ttl_for(widget_id: str) -> int: | |
| meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) | |
| return WIDGET_REDIS_TTL.get(meta.get("category", ""), _DEFAULT_TTL) | |
| class KPICacheService: | |
| async def get_kpi_stats( | |
| merchant_id: str, | |
| period_window: str, | |
| branch_id: str, | |
| kpi_ids: Optional[List[str]], | |
| use_cache: bool, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Concurrently compute all requested KPIs via asyncio.gather(). | |
| Failing KPIs return {"error": "...", "value": null} — never HTTP 500. | |
| """ | |
| target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys()) | |
| async def compute_one(wid: str): | |
| handler = KPI_HANDLER_REGISTRY.get(wid) | |
| if not handler: | |
| return wid, {"error": f"KPI not found: {wid}", "value": None} | |
| try: | |
| br = branch_id if branch_id != "all" else None | |
| result: KPIResult = await handler.compute( | |
| merchant_id=merchant_id, | |
| period_window=period_window, | |
| branch_id=br, | |
| use_cache=use_cache, | |
| ) | |
| await KPICacheService._write_to_mongo( | |
| merchant_id, wid, period_window, branch_id, result | |
| ) | |
| return wid, KPICacheService._to_schema(wid, result) | |
| except Exception as exc: | |
| logger.error( | |
| "KPI computation failed", | |
| extra={"event": "kpi_compute_error", "widget_id": wid, | |
| "merchant_id": merchant_id, "error": str(exc)}, | |
| exc_info=True, | |
| ) | |
| return wid, {"error": str(exc), "value": None} | |
| pairs = await asyncio.gather(*[compute_one(wid) for wid in target_ids]) | |
| return dict(pairs) | |
| async def get_individual_kpi( | |
| widget_id: str, | |
| merchant_id: str, | |
| period_window: str, | |
| branch_id: str, | |
| use_cache: bool, | |
| ) -> Optional[KPIResultSchema]: | |
| """ | |
| Compute a single KPI. Returns None when widget_id is unknown (caller raises 404). | |
| use_cache=False bypasses cache read but writes fresh result back to Redis. | |
| """ | |
| handler = KPI_HANDLER_REGISTRY.get(widget_id) | |
| if not handler: | |
| return None | |
| br = branch_id if branch_id != "all" else None | |
| result: KPIResult = await handler.compute( | |
| merchant_id=merchant_id, | |
| period_window=period_window, | |
| branch_id=br, | |
| use_cache=use_cache, | |
| ) | |
| await KPICacheService._write_to_mongo( | |
| merchant_id, widget_id, period_window, branch_id, result | |
| ) | |
| return KPICacheService._to_schema(widget_id, result) | |
| async def rebuild( | |
| merchant_id: str, | |
| period_window: Optional[str], | |
| branch_id: Optional[str], | |
| kpi_ids: Optional[List[str]], | |
| ) -> Dict[str, Any]: | |
| """Force recompute for all/subset of KPIs and update both caches.""" | |
| target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys()) | |
| periods = [period_window] if period_window else ["today", "last_7_days", "mtd", "ytd"] | |
| branches = [branch_id] if branch_id else ["all"] | |
| rebuilt, failed = 0, 0 | |
| errors: Dict[str, str] = {} | |
| for pw in periods: | |
| for br in branches: | |
| results = await KPICacheService.get_kpi_stats( | |
| merchant_id=merchant_id, | |
| period_window=pw, | |
| branch_id=br, | |
| kpi_ids=target_ids, | |
| use_cache=False, | |
| ) | |
| for wid, payload in results.items(): | |
| if isinstance(payload, dict) and payload.get("error"): | |
| failed += 1 | |
| errors[f"{wid}:{pw}:{br}"] = payload["error"] | |
| else: | |
| rebuilt += 1 | |
| return {"rebuilt": rebuilt, "failed": failed, "errors": errors} | |
| async def list_cache( | |
| filters: Dict[str, Any], | |
| skip: int, | |
| limit: int, | |
| projection_list: Optional[List[str]], | |
| ) -> List[Any]: | |
| """List KPI cache documents with optional MongoDB projection.""" | |
| db = get_database() | |
| collection = db[KPI_CACHE_COLLECTION] | |
| query: Dict[str, Any] = {} | |
| for field in ("merchant_id", "widget_id", "period_window", "branch_id"): | |
| if filters.get(field): | |
| query[field] = filters[field] | |
| if filters.get("computed_at_from") or filters.get("computed_at_to"): | |
| date_filter: Dict[str, Any] = {} | |
| if filters.get("computed_at_from"): | |
| date_filter["$gte"] = filters["computed_at_from"] | |
| if filters.get("computed_at_to"): | |
| date_filter["$lte"] = filters["computed_at_to"] | |
| query["computed_at"] = date_filter | |
| projection_dict = None | |
| if projection_list: | |
| projection_dict = {f: 1 for f in projection_list} | |
| projection_dict["_id"] = 0 | |
| cursor = collection.find(query, projection_dict).skip(skip).limit(limit) | |
| return await cursor.to_list(length=limit) | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _to_schema(widget_id: str, result: KPIResult) -> KPIResultSchema: | |
| meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) | |
| return KPIResultSchema( | |
| widget_id=widget_id, | |
| title=meta.get("title", widget_id), | |
| category=meta.get("category", ""), | |
| value=max(result.value, 0), | |
| unit=result.unit, | |
| delta=result.delta, | |
| delta_percentage=result.delta_percentage, | |
| trend=result.trend, | |
| secondary_values=result.secondary_values, | |
| drill_down_url=result.drill_down_url or meta.get("drill_down_url"), | |
| cached=result.cached, | |
| computed_at=result.computed_at, | |
| error=result.error, | |
| ) | |
| async def _write_to_mongo( | |
| merchant_id: str, | |
| widget_id: str, | |
| period_window: str, | |
| branch_id: str, | |
| result: KPIResult, | |
| ) -> None: | |
| try: | |
| db = get_database() | |
| collection = db[KPI_CACHE_COLLECTION] | |
| now = datetime.now(timezone.utc) | |
| ttl_seconds = _ttl_for(widget_id) | |
| meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) | |
| doc = { | |
| "cache_id": str(uuid.uuid4()), | |
| "merchant_id": merchant_id, | |
| "widget_id": widget_id, | |
| "period_window": period_window, | |
| "branch_id": branch_id, | |
| "value": max(result.value, 0), | |
| "unit": result.unit, | |
| "delta": result.delta, | |
| "delta_percentage": result.delta_percentage, | |
| "trend": result.trend, | |
| "secondary_values": result.secondary_values, | |
| "drill_down_url": result.drill_down_url or meta.get("drill_down_url"), | |
| "computed_at": now, | |
| "expires_at": now + timedelta(seconds=ttl_seconds), | |
| "cached": False, | |
| "error": result.error, | |
| } | |
| await collection.update_one( | |
| {"merchant_id": merchant_id, "widget_id": widget_id, | |
| "period_window": period_window, "branch_id": branch_id}, | |
| {"$set": doc}, | |
| upsert=True, | |
| ) | |
| except Exception as exc: | |
| logger.error( | |
| "MongoDB write failed", | |
| extra={"event": "mongo_write_error", "error": str(exc)}, | |
| ) | |