MukeshKapoor25's picture
Initial commit
b143975
"""
KPI Cache Service — registry-based asyncio.gather() dispatch.
Reads from Redis L1 (via KPIHandler.compute) -> MongoDB L2 write-back.
"""
import asyncio
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional, List, Dict, Any
from app.core.logging import get_logger
from app.nosql import get_database
from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY
from app.kpi_handlers.base_handler import KPIResult
from app.kpi_cache.constants import (
KPI_CACHE_COLLECTION,
KPI_WIDGET_REGISTRY,
WIDGET_REDIS_TTL,
)
from app.kpi_cache.schemas.schema import KPIResultSchema
logger = get_logger(__name__)
_DEFAULT_TTL = 600
def _ttl_for(widget_id: str) -> int:
meta = KPI_WIDGET_REGISTRY.get(widget_id, {})
return WIDGET_REDIS_TTL.get(meta.get("category", ""), _DEFAULT_TTL)
class KPICacheService:
@staticmethod
async def get_kpi_stats(
merchant_id: str,
period_window: str,
branch_id: str,
kpi_ids: Optional[List[str]],
use_cache: bool,
) -> Dict[str, Any]:
"""
Concurrently compute all requested KPIs via asyncio.gather().
Failing KPIs return {"error": "...", "value": null} — never HTTP 500.
"""
target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys())
async def compute_one(wid: str):
handler = KPI_HANDLER_REGISTRY.get(wid)
if not handler:
return wid, {"error": f"KPI not found: {wid}", "value": None}
try:
br = branch_id if branch_id != "all" else None
result: KPIResult = await handler.compute(
merchant_id=merchant_id,
period_window=period_window,
branch_id=br,
use_cache=use_cache,
)
await KPICacheService._write_to_mongo(
merchant_id, wid, period_window, branch_id, result
)
return wid, KPICacheService._to_schema(wid, result)
except Exception as exc:
logger.error(
"KPI computation failed",
extra={"event": "kpi_compute_error", "widget_id": wid,
"merchant_id": merchant_id, "error": str(exc)},
exc_info=True,
)
return wid, {"error": str(exc), "value": None}
pairs = await asyncio.gather(*[compute_one(wid) for wid in target_ids])
return dict(pairs)
@staticmethod
async def get_individual_kpi(
widget_id: str,
merchant_id: str,
period_window: str,
branch_id: str,
use_cache: bool,
) -> Optional[KPIResultSchema]:
"""
Compute a single KPI. Returns None when widget_id is unknown (caller raises 404).
use_cache=False bypasses cache read but writes fresh result back to Redis.
"""
handler = KPI_HANDLER_REGISTRY.get(widget_id)
if not handler:
return None
br = branch_id if branch_id != "all" else None
result: KPIResult = await handler.compute(
merchant_id=merchant_id,
period_window=period_window,
branch_id=br,
use_cache=use_cache,
)
await KPICacheService._write_to_mongo(
merchant_id, widget_id, period_window, branch_id, result
)
return KPICacheService._to_schema(widget_id, result)
@staticmethod
async def rebuild(
merchant_id: str,
period_window: Optional[str],
branch_id: Optional[str],
kpi_ids: Optional[List[str]],
) -> Dict[str, Any]:
"""Force recompute for all/subset of KPIs and update both caches."""
target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys())
periods = [period_window] if period_window else ["today", "last_7_days", "mtd", "ytd"]
branches = [branch_id] if branch_id else ["all"]
rebuilt, failed = 0, 0
errors: Dict[str, str] = {}
for pw in periods:
for br in branches:
results = await KPICacheService.get_kpi_stats(
merchant_id=merchant_id,
period_window=pw,
branch_id=br,
kpi_ids=target_ids,
use_cache=False,
)
for wid, payload in results.items():
if isinstance(payload, dict) and payload.get("error"):
failed += 1
errors[f"{wid}:{pw}:{br}"] = payload["error"]
else:
rebuilt += 1
return {"rebuilt": rebuilt, "failed": failed, "errors": errors}
@staticmethod
async def list_cache(
filters: Dict[str, Any],
skip: int,
limit: int,
projection_list: Optional[List[str]],
) -> List[Any]:
"""List KPI cache documents with optional MongoDB projection."""
db = get_database()
collection = db[KPI_CACHE_COLLECTION]
query: Dict[str, Any] = {}
for field in ("merchant_id", "widget_id", "period_window", "branch_id"):
if filters.get(field):
query[field] = filters[field]
if filters.get("computed_at_from") or filters.get("computed_at_to"):
date_filter: Dict[str, Any] = {}
if filters.get("computed_at_from"):
date_filter["$gte"] = filters["computed_at_from"]
if filters.get("computed_at_to"):
date_filter["$lte"] = filters["computed_at_to"]
query["computed_at"] = date_filter
projection_dict = None
if projection_list:
projection_dict = {f: 1 for f in projection_list}
projection_dict["_id"] = 0
cursor = collection.find(query, projection_dict).skip(skip).limit(limit)
return await cursor.to_list(length=limit)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _to_schema(widget_id: str, result: KPIResult) -> KPIResultSchema:
meta = KPI_WIDGET_REGISTRY.get(widget_id, {})
return KPIResultSchema(
widget_id=widget_id,
title=meta.get("title", widget_id),
category=meta.get("category", ""),
value=max(result.value, 0),
unit=result.unit,
delta=result.delta,
delta_percentage=result.delta_percentage,
trend=result.trend,
secondary_values=result.secondary_values,
drill_down_url=result.drill_down_url or meta.get("drill_down_url"),
cached=result.cached,
computed_at=result.computed_at,
error=result.error,
)
@staticmethod
async def _write_to_mongo(
merchant_id: str,
widget_id: str,
period_window: str,
branch_id: str,
result: KPIResult,
) -> None:
try:
db = get_database()
collection = db[KPI_CACHE_COLLECTION]
now = datetime.now(timezone.utc)
ttl_seconds = _ttl_for(widget_id)
meta = KPI_WIDGET_REGISTRY.get(widget_id, {})
doc = {
"cache_id": str(uuid.uuid4()),
"merchant_id": merchant_id,
"widget_id": widget_id,
"period_window": period_window,
"branch_id": branch_id,
"value": max(result.value, 0),
"unit": result.unit,
"delta": result.delta,
"delta_percentage": result.delta_percentage,
"trend": result.trend,
"secondary_values": result.secondary_values,
"drill_down_url": result.drill_down_url or meta.get("drill_down_url"),
"computed_at": now,
"expires_at": now + timedelta(seconds=ttl_seconds),
"cached": False,
"error": result.error,
}
await collection.update_one(
{"merchant_id": merchant_id, "widget_id": widget_id,
"period_window": period_window, "branch_id": branch_id},
{"$set": doc},
upsert=True,
)
except Exception as exc:
logger.error(
"MongoDB write failed",
extra={"event": "mongo_write_error", "error": str(exc)},
)