diff --git a/.env.example b/.env.example new file mode 100644 index 0000000000000000000000000000000000000000..787b94c74b1fefd06811cef341d8f6c913613a46 --- /dev/null +++ b/.env.example @@ -0,0 +1,42 @@ +# Application Configuration +APP_NAME=Analytics Microservice +APP_VERSION=1.0.0 +DEBUG=false + +# MongoDB Configuration +MONGODB_URI=mongodb://localhost:27017 +MONGODB_DB_NAME=cuatrolabs + +# PostgreSQL Configuration +DB_PROTOCOL=postgresql+asyncpg +DB_USER=postgres +DB_PASSWORD=your-db-password +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=cuatrolabs +DB_SSLMODE=disable +DATABASE_URL=postgresql+asyncpg://postgres:your-db-password@localhost:5432/cuatrolabs + +# Redis Configuration +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD=your-redis-password +REDIS_DB=0 + +# JWT Configuration +SECRET_KEY=your-secret-key-here-change-in-production +ALGORITHM=HS256 +TOKEN_EXPIRATION_HOURS=8 + +# Logging Configuration +LOG_LEVEL=INFO +LOG_FORMAT=json +LOG_DIR=logs +LOG_MAX_BYTES=52428800 +LOG_BACKUP_COUNT=10 + +# CORS Settings +CORS_ORIGINS=["http://localhost:3000","http://localhost:8000"] + +# Root path (for reverse proxy / k8s ingress) +ROOT_PATH= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..6959cc8891d6220106c8b8cb9f8aa9fed5fb83a2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +.env +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ +venv/ +dist/ +build/ +.pytest_cache/ +.hypothesis/ +logs/ +*.log +.DS_Store diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..4d31b576d5cf925f7a6ebff18cba6be0e49599ae --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.11-slim-bullseye AS base + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PATH="/home/user/.local/bin:$PATH" + +RUN apt-get update && apt-get install -y \ + openssl \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +RUN useradd -m -u 1000 user +USER user +WORKDIR /app + +COPY --chown=user ./requirements.txt requirements.txt +COPY --chown=user ./app/insightfy_utils-0.1.0-py3-none-any.whl insightfy_utils-0.1.0-py3-none-any.whl +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir insightfy_utils-0.1.0-py3-none-any.whl && \ + pip install --no-cache-dir --upgrade -r requirements.txt + +COPY --chown=user . /app + +EXPOSE 7860 + +CMD ["sh", "-c", "uvicorn app.main:app --host 0.0.0.0 --port ${PORT:-7860} --workers 4 --log-level info"] diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/cache.py b/app/cache.py new file mode 100644 index 0000000000000000000000000000000000000000..3d86201e749b22d9f339cc6e822065e4745d41ec --- /dev/null +++ b/app/cache.py @@ -0,0 +1,49 @@ +""" +Redis connection management for Analytics Microservice. +""" +import redis.asyncio as redis +from app.core.logging import get_logger +from app.core.config import settings + +logger = get_logger(__name__) + +redis_client: redis.Redis = None + + +async def connect_to_redis(): + global redis_client + try: + pool_params = { + "host": settings.REDIS_HOST, + "port": settings.REDIS_PORT, + "db": settings.REDIS_DB, + "decode_responses": True, + "max_connections": 5, + "socket_keepalive": True, + "socket_connect_timeout": 5, + "socket_timeout": 5, + "retry_on_timeout": False, + "health_check_interval": 30, + } + if settings.REDIS_PASSWORD: + pool_params["password"] = settings.REDIS_PASSWORD + + pool = redis.ConnectionPool(**pool_params) + redis_client = redis.Redis(connection_pool=pool) + await redis_client.ping() + logger.info("Redis connected successfully", extra={"event": "redis_connected"}) + except Exception as e: + logger.warning("Redis connection failed - continuing without cache", extra={"event": "redis_connect_failure", "error": str(e)}) + redis_client = None + + +async def close_redis_connection(): + global redis_client + if redis_client: + await redis_client.aclose() + redis_client = None + logger.info("Redis connection closed", extra={"event": "redis_disconnected"}) + + +def get_redis() -> redis.Redis: + return redis_client diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000000000000000000000000000000000000..bcdbddfad79d0d8902de8b4ae2276ce395becab5 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,70 @@ +""" +Configuration settings for Analytics Microservice. +""" +import os +import logging +from typing import Optional, List +from pydantic import model_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + +logger = logging.getLogger(__name__) + + +class Settings(BaseSettings): + # Application + APP_NAME: str = "Analytics Microservice" + APP_VERSION: str = "1.0.0" + DEBUG: bool = False + + # MongoDB + MONGODB_URI: str = os.getenv("MONGODB_URI", "mongodb://localhost:27017") + MONGODB_DB_NAME: str = os.getenv("MONGODB_DB_NAME", "cuatrolabs") + + # PostgreSQL + POSTGRES_HOST: str = os.getenv("DB_HOST", "localhost") + POSTGRES_PORT: int = int(os.getenv("DB_PORT", "5432")) + POSTGRES_DB: str = os.getenv("DB_NAME", "cuatrolabs") + POSTGRES_USER: str = os.getenv("DB_USER", "postgres") + POSTGRES_PASSWORD: str = os.getenv("DB_PASSWORD", "") + POSTGRES_MIN_POOL_SIZE: int = int(os.getenv("POSTGRES_MIN_POOL_SIZE", "2")) + POSTGRES_MAX_POOL_SIZE: int = int(os.getenv("POSTGRES_MAX_POOL_SIZE", "10")) + POSTGRES_SSL_MODE: str = os.getenv("DB_SSLMODE", "disable") + POSTGRES_URI: Optional[str] = None + + @model_validator(mode="after") + def assemble_db_connection(self) -> "Settings": + from urllib.parse import quote_plus + env_url = (os.getenv("DATABASE_URL") or os.getenv("DATABASE_URI") or "").strip() + if env_url: + self.POSTGRES_URI = env_url + return self + if all([self.POSTGRES_USER, self.POSTGRES_PASSWORD, self.POSTGRES_HOST, self.POSTGRES_DB]): + protocol = os.getenv("DB_PROTOCOL", "postgresql+asyncpg") + self.POSTGRES_URI = ( + f"{protocol}://{self.POSTGRES_USER}:{quote_plus(self.POSTGRES_PASSWORD)}" + f"@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}" + ) + return self + + # Redis + REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost") + REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379")) + REDIS_PASSWORD: Optional[str] = os.getenv("REDIS_PASSWORD") + REDIS_DB: int = int(os.getenv("REDIS_DB", "0")) + + # JWT + SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production") + ALGORITHM: str = os.getenv("ALGORITHM", "HS256") + TOKEN_EXPIRATION_HOURS: int = int(os.getenv("TOKEN_EXPIRATION_HOURS", "8")) + + # Logging + LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO") + LOG_FORMAT: str = os.getenv("LOG_FORMAT", "json") + + # CORS + CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:8000"] + + model_config = SettingsConfigDict(env_file=".env", extra="ignore") + + +settings = Settings() diff --git a/app/core/logging.py b/app/core/logging.py new file mode 100644 index 0000000000000000000000000000000000000000..5f3b5be3da493c179b2814c3affe12481e6e8db9 --- /dev/null +++ b/app/core/logging.py @@ -0,0 +1,119 @@ +""" +Production-grade logging configuration for Analytics Microservice. +""" +import json +import logging +import logging.handlers +import os +import sys +import traceback +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Optional + + +SERVICE_NAME = "analytics-ms" +LOG_DIR = Path(os.getenv("LOG_DIR", "logs")) +LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", 50 * 1024 * 1024)) +LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", "10")) + + +class JSONFormatter(logging.Formatter): + """Emit log records as single-line JSON objects.""" + + RESERVED = frozenset({ + "args", "created", "exc_info", "exc_text", "filename", + "funcName", "levelname", "levelno", "lineno", "message", + "module", "msecs", "msg", "name", "pathname", "process", + "processName", "relativeCreated", "stack_info", "thread", "threadName", + }) + + def format(self, record: logging.LogRecord) -> str: + payload: Dict[str, Any] = { + "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + "service": SERVICE_NAME, + "pid": record.process, + } + if record.levelno >= logging.WARNING: + payload["caller"] = f"{record.pathname}:{record.lineno}" + for key, val in record.__dict__.items(): + if key not in self.RESERVED and not key.startswith("_"): + payload[key] = val + if record.exc_info and record.exc_info[0] is not None: + exc_type, exc_value, exc_tb = record.exc_info + payload["exception"] = { + "type": exc_type.__name__, + "message": str(exc_value), + "stacktrace": traceback.format_exception(exc_type, exc_value, exc_tb), + } + try: + return json.dumps(payload, default=str) + except Exception: + payload["message"] = str(record.getMessage()) + return json.dumps(payload, default=str) + + +class ConsoleFormatter(logging.Formatter): + GREY = "\x1b[38;5;240m" + CYAN = "\x1b[36m" + YELLOW = "\x1b[33m" + RED = "\x1b[31m" + BOLD_RED = "\x1b[1;31m" + RESET = "\x1b[0m" + + LEVEL_COLORS = { + logging.DEBUG: "\x1b[38;5;240m", + logging.INFO: "\x1b[36m", + logging.WARNING: "\x1b[33m", + logging.ERROR: "\x1b[31m", + logging.CRITICAL: "\x1b[1;31m", + } + + def format(self, record: logging.LogRecord) -> str: + color = self.LEVEL_COLORS.get(record.levelno, self.RESET) + ts = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%H:%M:%S") + msg = record.getMessage() + base = f"{self.GREY}{ts}{self.RESET} {color}{record.levelname:<8}{self.RESET} {record.name} - {msg}" + if record.exc_info: + base += "\n" + self.formatException(record.exc_info) + return base + + +def setup_logging(level: str = "INFO") -> None: + numeric_level = getattr(logging, level.upper(), logging.INFO) + root = logging.getLogger() + root.setLevel(numeric_level) + root.handlers.clear() + + # Console handler + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(numeric_level) + is_json = os.getenv("LOG_FORMAT", "json").lower() == "json" + ch.setFormatter(JSONFormatter() if is_json else ConsoleFormatter()) + root.addHandler(ch) + + # File handlers + try: + LOG_DIR.mkdir(parents=True, exist_ok=True) + fh = logging.handlers.RotatingFileHandler( + LOG_DIR / "app.log", maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT + ) + fh.setLevel(numeric_level) + fh.setFormatter(JSONFormatter()) + root.addHandler(fh) + + eh = logging.handlers.RotatingFileHandler( + LOG_DIR / "app_errors.log", maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT + ) + eh.setLevel(logging.ERROR) + eh.setFormatter(JSONFormatter()) + root.addHandler(eh) + except Exception: + pass # Non-critical if file logging fails + + +def get_logger(name: str) -> logging.Logger: + return logging.getLogger(name) diff --git a/app/dashboard/__init__.py b/app/dashboard/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/dashboard/constants.py b/app/dashboard/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..26b4fdc2a9e3bf0044b0e28cc828f5d7e9e3851d --- /dev/null +++ b/app/dashboard/constants.py @@ -0,0 +1,25 @@ +"""Constants for analytics dashboard module.""" + +ANALYTICS_METRICS_COLLECTION = "analytics_metrics" + + +class MetricPeriod: + HOURLY = "hourly" + DAILY = "daily" + WEEKLY = "weekly" + MONTHLY = "monthly" + + +class MetricType: + TOTAL_ORDERS = "total_orders" + TOTAL_REVENUE = "total_revenue" + TOTAL_USERS = "total_users" + TOTAL_EVENTS = "total_events" + CONVERSION_RATE = "conversion_rate" + AVG_ORDER_VALUE = "avg_order_value" + ACTIVE_SESSIONS = "active_sessions" + CUSTOM = "custom" + + @classmethod + def values(cls): + return [v for k, v in vars(cls).items() if not k.startswith("_") and isinstance(v, str)] diff --git a/app/dashboard/controllers/__init__.py b/app/dashboard/controllers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/dashboard/controllers/router.py b/app/dashboard/controllers/router.py new file mode 100644 index 0000000000000000000000000000000000000000..e4df078addd5e95d28dce0143052a375cbdf9101 --- /dev/null +++ b/app/dashboard/controllers/router.py @@ -0,0 +1,47 @@ +"""Analytics dashboard/metrics API router.""" +from fastapi import APIRouter, Depends, status + +from app.core.logging import get_logger +from app.dependencies.auth import get_current_user, TokenUser +from app.dashboard.schemas.schema import ( + MetricUpsert, MetricResponse, MetricListRequest, + DashboardSummaryRequest, StatusResponse, +) +from app.dashboard.services.service import MetricService + +logger = get_logger(__name__) + +router = APIRouter(prefix="/metrics", tags=["dashboard", "metrics"]) + + +@router.post("", response_model=StatusResponse, status_code=status.HTTP_201_CREATED, summary="Upsert a metric snapshot") +async def upsert_metric( + payload: MetricUpsert, + current_user: TokenUser = Depends(get_current_user), +): + metric_id = await MetricService.upsert_metric(payload) + return StatusResponse(success=True, message="Metric upserted", metric_id=metric_id) + + +@router.post("/list", summary="List metrics with filters and optional projection") +async def list_metrics( + payload: MetricListRequest, + current_user: TokenUser = Depends(get_current_user), +): + results = await MetricService.list_metrics( + filters=payload.filters, + skip=payload.skip, + limit=payload.limit, + projection_list=payload.projection_list, + ) + total = await MetricService.count_metrics(payload.filters) + return {"success": True, "total": total, "skip": payload.skip, "limit": payload.limit, "data": results} + + +@router.post("/dashboard/summary", summary="Get aggregated dashboard summary for a merchant") +async def dashboard_summary( + payload: DashboardSummaryRequest, + current_user: TokenUser = Depends(get_current_user), +): + summary = await MetricService.get_dashboard_summary(payload) + return {"success": True, **summary} diff --git a/app/dashboard/models/__init__.py b/app/dashboard/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/dashboard/models/model.py b/app/dashboard/models/model.py new file mode 100644 index 0000000000000000000000000000000000000000..cfa37cb822969eafd4755c06570365777c8c7f2c --- /dev/null +++ b/app/dashboard/models/model.py @@ -0,0 +1,18 @@ +"""Analytics metric snapshot model.""" +from datetime import datetime +from typing import Optional, Dict, Any +from pydantic import BaseModel, Field + + +class MetricModel(BaseModel): + metric_id: str = Field(..., description="Unique metric identifier (UUID)") + metric_type: str = Field(..., description="Type of metric") + merchant_id: str = Field(..., description="Merchant this metric belongs to") + period: str = Field(..., description="Aggregation period: hourly | daily | weekly | monthly") + period_start: datetime = Field(..., description="Start of the aggregation period") + period_end: datetime = Field(..., description="End of the aggregation period") + value: float = Field(..., description="Metric value") + dimensions: Optional[Dict[str, Any]] = Field(None, description="Breakdown dimensions (source, category, etc.)") + metadata: Optional[Dict[str, Any]] = Field(None) + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: Optional[datetime] = Field(None) diff --git a/app/dashboard/schemas/__init__.py b/app/dashboard/schemas/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/dashboard/schemas/schema.py b/app/dashboard/schemas/schema.py new file mode 100644 index 0000000000000000000000000000000000000000..a3b18d84caa67d6ee768a2bf5419fd3e9a117ce8 --- /dev/null +++ b/app/dashboard/schemas/schema.py @@ -0,0 +1,57 @@ +"""Pydantic schemas for analytics dashboard/metrics.""" +from datetime import datetime +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field + + +class MetricUpsert(BaseModel): + metric_type: str = Field(...) + merchant_id: str = Field(...) + period: str = Field(...) + period_start: datetime = Field(...) + period_end: datetime = Field(...) + value: float = Field(...) + dimensions: Optional[Dict[str, Any]] = Field(None) + metadata: Optional[Dict[str, Any]] = Field(None) + + +class MetricResponse(BaseModel): + metric_id: str + metric_type: str + merchant_id: str + period: str + period_start: datetime + period_end: datetime + value: float + dimensions: Optional[Dict[str, Any]] = None + created_at: datetime + updated_at: Optional[datetime] = None + + +class MetricFilters(BaseModel): + merchant_id: Optional[str] = None + metric_type: Optional[str] = None + period: Optional[str] = None + period_start_from: Optional[datetime] = None + period_start_to: Optional[datetime] = None + + +class MetricListRequest(BaseModel): + filters: Optional[MetricFilters] = Field(default_factory=MetricFilters) + skip: int = Field(0, ge=0) + limit: int = Field(100, ge=1, le=1000) + projection_list: Optional[List[str]] = Field(None, description="List of fields to include in response") + + +class DashboardSummaryRequest(BaseModel): + merchant_id: str = Field(...) + period: str = Field("daily", description="hourly | daily | weekly | monthly") + period_start: Optional[datetime] = Field(None) + period_end: Optional[datetime] = Field(None) + metric_types: Optional[List[str]] = Field(None, description="Filter to specific metric types") + + +class StatusResponse(BaseModel): + success: bool + message: str + metric_id: Optional[str] = None diff --git a/app/dashboard/services/__init__.py b/app/dashboard/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/dashboard/services/service.py b/app/dashboard/services/service.py new file mode 100644 index 0000000000000000000000000000000000000000..40d4999cf7ca496686ec1182eb9823377fc9fb0e --- /dev/null +++ b/app/dashboard/services/service.py @@ -0,0 +1,121 @@ +"""Analytics dashboard/metrics service layer.""" +import uuid +from datetime import datetime +from typing import Optional, List, Dict, Any + +from fastapi import HTTPException, status +from motor.motor_asyncio import AsyncIOMotorDatabase + +from app.core.logging import get_logger +from app.dashboard.constants import ANALYTICS_METRICS_COLLECTION +from app.dashboard.models.model import MetricModel +from app.dashboard.schemas.schema import MetricUpsert, MetricFilters, DashboardSummaryRequest +from app.nosql import get_database + +logger = get_logger(__name__) + + +class MetricService: + + @staticmethod + async def upsert_metric(payload: MetricUpsert) -> str: + db: AsyncIOMotorDatabase = get_database() + collection = db[ANALYTICS_METRICS_COLLECTION] + + # Upsert by natural key: merchant + type + period + period_start + filter_key = { + "merchant_id": payload.merchant_id, + "metric_type": payload.metric_type, + "period": payload.period, + "period_start": payload.period_start, + } + existing = await collection.find_one(filter_key, {"_id": 0, "metric_id": 1}) + metric_id = existing["metric_id"] if existing else str(uuid.uuid4()) + + doc = MetricModel( + metric_id=metric_id, + metric_type=payload.metric_type, + merchant_id=payload.merchant_id, + period=payload.period, + period_start=payload.period_start, + period_end=payload.period_end, + value=payload.value, + dimensions=payload.dimensions, + metadata=payload.metadata, + updated_at=datetime.utcnow(), + ) + await collection.update_one(filter_key, {"$set": doc.model_dump()}, upsert=True) + logger.info("Metric upserted", extra={"event": "metric_upserted", "metric_id": metric_id}) + return metric_id + + @staticmethod + def _build_query(filters: MetricFilters) -> dict: + query: Dict[str, Any] = {} + if filters.merchant_id: + query["merchant_id"] = filters.merchant_id + if filters.metric_type: + query["metric_type"] = filters.metric_type + if filters.period: + query["period"] = filters.period + date_range: Dict[str, Any] = {} + if filters.period_start_from: + date_range["$gte"] = filters.period_start_from + if filters.period_start_to: + date_range["$lte"] = filters.period_start_to + if date_range: + query["period_start"] = date_range + return query + + @staticmethod + async def list_metrics( + filters: MetricFilters, + skip: int = 0, + limit: int = 100, + projection_list: Optional[List[str]] = None, + ): + db: AsyncIOMotorDatabase = get_database() + collection = db[ANALYTICS_METRICS_COLLECTION] + query = MetricService._build_query(filters) + + projection_dict = None + if projection_list: + projection_dict = {field: 1 for field in projection_list} + projection_dict["_id"] = 0 + + cursor = collection.find(query, projection_dict).sort("period_start", -1).skip(skip).limit(limit) + docs = await cursor.to_list(length=limit) + return docs if projection_list else [MetricModel(**d) for d in docs] + + @staticmethod + async def count_metrics(filters: MetricFilters) -> int: + db: AsyncIOMotorDatabase = get_database() + query = MetricService._build_query(filters) + return await db[ANALYTICS_METRICS_COLLECTION].count_documents(query) + + @staticmethod + async def get_dashboard_summary(req: DashboardSummaryRequest) -> Dict[str, Any]: + """Aggregate latest metric values per type for a merchant/period.""" + db: AsyncIOMotorDatabase = get_database() + collection = db[ANALYTICS_METRICS_COLLECTION] + + match: Dict[str, Any] = {"merchant_id": req.merchant_id, "period": req.period} + if req.metric_types: + match["metric_type"] = {"$in": req.metric_types} + if req.period_start: + match.setdefault("period_start", {})["$gte"] = req.period_start + if req.period_end: + match.setdefault("period_start", {})["$lte"] = req.period_end + + pipeline = [ + {"$match": match}, + {"$sort": {"period_start": -1}}, + {"$group": { + "_id": "$metric_type", + "latest_value": {"$first": "$value"}, + "period_start": {"$first": "$period_start"}, + "period_end": {"$first": "$period_end"}, + }}, + {"$project": {"_id": 0, "metric_type": "$_id", "latest_value": 1, "period_start": 1, "period_end": 1}}, + ] + results = await collection.aggregate(pipeline).to_list(length=100) + return {"merchant_id": req.merchant_id, "period": req.period, "metrics": results} diff --git a/app/dependencies/__init__.py b/app/dependencies/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/dependencies/auth.py b/app/dependencies/auth.py new file mode 100644 index 0000000000000000000000000000000000000000..bae978a39b40cf9df024475733090ef18c8f3cb2 --- /dev/null +++ b/app/dependencies/auth.py @@ -0,0 +1,60 @@ +""" +Authentication dependencies for Analytics Microservice. +Validates JWT tokens issued by the Auth microservice. +""" +from typing import Optional +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from jose import JWTError, jwt +from pydantic import BaseModel + +from app.core.config import settings + +security = HTTPBearer() + + +class TokenUser(BaseModel): + user_id: str + username: str + role_id: str + merchant_id: str + merchant_type: Optional[str] = None + metadata: Optional[dict] = None + + def has_role(self, *roles: str) -> bool: + return self.role_id in roles + + def is_admin(self) -> bool: + return "admin" in self.role_id.lower() + + def is_super_admin(self) -> bool: + return "super_admin" in self.role_id.lower() + + +async def get_current_user( + credentials: HTTPAuthorizationCredentials = Depends(security), +) -> TokenUser: + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode( + credentials.credentials, + settings.SECRET_KEY, + algorithms=[settings.ALGORITHM], + ) + user_id: str = payload.get("user_id") or payload.get("sub") + if not user_id: + raise credentials_exception + return TokenUser( + user_id=user_id, + username=payload.get("username", ""), + role_id=payload.get("role_id", ""), + merchant_id=payload.get("merchant_id", ""), + merchant_type=payload.get("merchant_type"), + metadata=payload.get("metadata"), + ) + except JWTError: + raise credentials_exception diff --git a/app/dependencies/kpi_permissions.py b/app/dependencies/kpi_permissions.py new file mode 100644 index 0000000000000000000000000000000000000000..3142e8e80578d331b80f41ef76118650975b415f --- /dev/null +++ b/app/dependencies/kpi_permissions.py @@ -0,0 +1,136 @@ +""" +KPI widget permission dependencies for Analytics Microservice. +Mirrors the SCM-ms pattern: + - merchant_id always from JWT (never from request body) + - require_dashboard_view: checks scm_access_roles.permissions.dashboard.view + - require_widget_access: checks scm_access_roles.widget_access[] per widget +""" +from fastapi import Depends, HTTPException, status + +from app.core.logging import get_logger +from app.dependencies.auth import get_current_user, TokenUser +from app.nosql import get_database + +logger = get_logger(__name__) + +SCM_ACCESS_ROLES_COLLECTION = "scm_access_roles" + + +async def _get_role_doc(role_id: str) -> dict | None: + db = get_database() + return await db[SCM_ACCESS_ROLES_COLLECTION].find_one( + {"role_id": role_id, "is_active": True}, {"_id": 0} + ) + + +# --------------------------------------------------------------------------- +# Dependency: dashboard.view permission (bulk stats + rebuild + list) +# --------------------------------------------------------------------------- + +async def require_dashboard_view( + current_user: TokenUser = Depends(get_current_user), +) -> TokenUser: + """ + Requires permissions.dashboard contains 'view'. + Mirrors SCM require_scm_permission('dashboard', 'view'). + """ + if not current_user.role_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="No role assigned to user", + ) + if not current_user.merchant_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="merchant_id missing from token", + ) + + try: + role_doc = await _get_role_doc(current_user.role_id) + if not role_doc: + logger.warning( + "Access role not found", + extra={"event": "kpi_role_not_found", "role_id": current_user.role_id, + "user_id": current_user.user_id}, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Access role not found", + ) + + permissions = role_doc.get("permissions", {}) + if "view" not in permissions.get("dashboard", []): + logger.warning( + "dashboard.view permission denied", + extra={"event": "kpi_permission_denied", "user_id": current_user.user_id, + "role_id": current_user.role_id}, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Access denied. Required permission: dashboard.view", + ) + + return current_user + + except HTTPException: + raise + except Exception as exc: + logger.error( + "Permission check failed", + extra={"event": "kpi_permission_check_error", "error": str(exc)}, + exc_info=True, + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Permission check failed", + ) + + +# --------------------------------------------------------------------------- +# Helper: widget-level access check (used inline in individual KPI endpoint) +# --------------------------------------------------------------------------- + +async def check_widget_access(widget_id: str, current_user: TokenUser) -> None: + """ + Verifies widget_id is listed in scm_access_roles.widget_access[]. + Raises 403 if not. Mirrors SCM require_widget_access(). + """ + try: + role_doc = await _get_role_doc(current_user.role_id) + if not role_doc: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Access role not found", + ) + + widget_access = role_doc.get("widget_access", []) + if widget_id not in widget_access: + logger.warning( + "Widget access denied", + extra={"event": "kpi_widget_access_denied", "widget_id": widget_id, + "user_id": current_user.user_id, "role_id": current_user.role_id}, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Access denied to widget: {widget_id}", + ) + + logger.debug( + "Widget access granted", + extra={"event": "kpi_widget_access_granted", "widget_id": widget_id, + "user_id": current_user.user_id}, + ) + + except HTTPException: + raise + except Exception as exc: + logger.error( + "Widget access check error", + extra={"event": "kpi_widget_access_check_error", "widget_id": widget_id, + "error": str(exc)}, + exc_info=True, + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error checking widget access", + ) diff --git a/app/events/__init__.py b/app/events/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/events/constants.py b/app/events/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..765611171f2f112b28f2ffbfc2d00f3660aa0c3e --- /dev/null +++ b/app/events/constants.py @@ -0,0 +1,21 @@ +"""Constants for analytics events module.""" + +ANALYTICS_EVENTS_COLLECTION = "analytics_events" + + +class EventType: + PAGE_VIEW = "page_view" + CLICK = "click" + PURCHASE = "purchase" + CART_ADD = "cart_add" + CART_REMOVE = "cart_remove" + SEARCH = "search" + LOGIN = "login" + LOGOUT = "logout" + ORDER_PLACED = "order_placed" + ORDER_CANCELLED = "order_cancelled" + CUSTOM = "custom" + + @classmethod + def values(cls): + return [v for k, v in vars(cls).items() if not k.startswith("_") and isinstance(v, str)] diff --git a/app/events/controllers/__init__.py b/app/events/controllers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/events/controllers/router.py b/app/events/controllers/router.py new file mode 100644 index 0000000000000000000000000000000000000000..9875000e3478e2b2dcd68c12f165d3d6240daea0 --- /dev/null +++ b/app/events/controllers/router.py @@ -0,0 +1,43 @@ +"""Analytics events API router.""" +from fastapi import APIRouter, Depends, status + +from app.core.logging import get_logger +from app.dependencies.auth import get_current_user, TokenUser +from app.events.schemas.schema import EventCreate, EventResponse, EventListRequest, StatusResponse +from app.events.services.service import EventService + +logger = get_logger(__name__) + +router = APIRouter(prefix="/events", tags=["events"]) + + +@router.post("", response_model=StatusResponse, status_code=status.HTTP_201_CREATED, summary="Ingest an analytics event") +async def create_event( + payload: EventCreate, + current_user: TokenUser = Depends(get_current_user), +): + event_id = await EventService.create_event(payload) + return StatusResponse(success=True, message="Event recorded", event_id=event_id) + + +@router.get("/{event_id}", response_model=EventResponse, summary="Get event by ID") +async def get_event( + event_id: str, + current_user: TokenUser = Depends(get_current_user), +): + return await EventService.get_event(event_id) + + +@router.post("/list", summary="List events with filters and optional projection") +async def list_events( + payload: EventListRequest, + current_user: TokenUser = Depends(get_current_user), +): + results = await EventService.list_events( + filters=payload.filters, + skip=payload.skip, + limit=payload.limit, + projection_list=payload.projection_list, + ) + total = await EventService.count_events(payload.filters) + return {"success": True, "total": total, "skip": payload.skip, "limit": payload.limit, "data": results} diff --git a/app/events/models/__init__.py b/app/events/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/events/models/model.py b/app/events/models/model.py new file mode 100644 index 0000000000000000000000000000000000000000..c67362ac65dc24fac48079b3735dc2a08192f680 --- /dev/null +++ b/app/events/models/model.py @@ -0,0 +1,19 @@ +"""Analytics event document model.""" +from datetime import datetime +from typing import Optional, Dict, Any +from pydantic import BaseModel, Field + + +class AnalyticsEventModel(BaseModel): + event_id: str = Field(..., description="Unique event identifier (UUID)") + event_type: str = Field(..., description="Type of event") + merchant_id: str = Field(..., description="Merchant this event belongs to") + user_id: Optional[str] = Field(None, description="User who triggered the event") + session_id: Optional[str] = Field(None, description="Session identifier") + source: Optional[str] = Field(None, description="Source system (pos, ecomm, scm, spa)") + entity_type: Optional[str] = Field(None, description="Entity type (product, order, etc.)") + entity_id: Optional[str] = Field(None, description="Entity identifier") + properties: Optional[Dict[str, Any]] = Field(None, description="Event-specific properties") + metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata") + occurred_at: datetime = Field(default_factory=datetime.utcnow, description="When the event occurred") + created_at: datetime = Field(default_factory=datetime.utcnow, description="When the record was created") diff --git a/app/events/schemas/__init__.py b/app/events/schemas/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/events/schemas/schema.py b/app/events/schemas/schema.py new file mode 100644 index 0000000000000000000000000000000000000000..1b9551e876ce50025026c84d2627e2a14ea25a72 --- /dev/null +++ b/app/events/schemas/schema.py @@ -0,0 +1,57 @@ +"""Pydantic schemas for analytics events.""" +from datetime import datetime +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field + +from app.events.constants import EventType + + +class EventCreate(BaseModel): + event_type: str = Field(..., description="Type of event") + merchant_id: str = Field(..., description="Merchant this event belongs to") + user_id: Optional[str] = Field(None) + session_id: Optional[str] = Field(None) + source: Optional[str] = Field(None, description="pos | ecomm | scm | spa") + entity_type: Optional[str] = Field(None) + entity_id: Optional[str] = Field(None) + properties: Optional[Dict[str, Any]] = Field(None) + metadata: Optional[Dict[str, Any]] = Field(None) + occurred_at: Optional[datetime] = Field(None) + + +class EventResponse(BaseModel): + event_id: str + event_type: str + merchant_id: str + user_id: Optional[str] = None + session_id: Optional[str] = None + source: Optional[str] = None + entity_type: Optional[str] = None + entity_id: Optional[str] = None + properties: Optional[Dict[str, Any]] = None + occurred_at: datetime + created_at: datetime + + +class EventFilters(BaseModel): + merchant_id: Optional[str] = None + event_type: Optional[str] = None + source: Optional[str] = None + entity_type: Optional[str] = None + entity_id: Optional[str] = None + user_id: Optional[str] = None + date_from: Optional[datetime] = None + date_to: Optional[datetime] = None + + +class EventListRequest(BaseModel): + filters: Optional[EventFilters] = Field(default_factory=EventFilters) + skip: int = Field(0, ge=0) + limit: int = Field(100, ge=1, le=1000) + projection_list: Optional[List[str]] = Field(None, description="List of fields to include in response") + + +class StatusResponse(BaseModel): + success: bool + message: str + event_id: Optional[str] = None diff --git a/app/events/services/__init__.py b/app/events/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/events/services/service.py b/app/events/services/service.py new file mode 100644 index 0000000000000000000000000000000000000000..1688564379676ec27e76425dd259e7ee4141e6af --- /dev/null +++ b/app/events/services/service.py @@ -0,0 +1,98 @@ +"""Analytics events service layer.""" +import uuid +from datetime import datetime +from typing import Optional, List, Dict, Any + +from fastapi import HTTPException, status +from motor.motor_asyncio import AsyncIOMotorDatabase + +from app.core.logging import get_logger +from app.events.constants import ANALYTICS_EVENTS_COLLECTION +from app.events.models.model import AnalyticsEventModel +from app.events.schemas.schema import EventCreate, EventFilters +from app.nosql import get_database + +logger = get_logger(__name__) + + +class EventService: + + @staticmethod + async def create_event(payload: EventCreate) -> str: + db: AsyncIOMotorDatabase = get_database() + collection = db[ANALYTICS_EVENTS_COLLECTION] + event_id = str(uuid.uuid4()) + doc = AnalyticsEventModel( + event_id=event_id, + event_type=payload.event_type, + merchant_id=payload.merchant_id, + user_id=payload.user_id, + session_id=payload.session_id, + source=payload.source, + entity_type=payload.entity_type, + entity_id=payload.entity_id, + properties=payload.properties, + metadata=payload.metadata, + occurred_at=payload.occurred_at or datetime.utcnow(), + ) + await collection.insert_one(doc.model_dump()) + logger.info("Event created", extra={"event": "event_created", "event_id": event_id, "event_type": payload.event_type}) + return event_id + + @staticmethod + async def get_event(event_id: str) -> AnalyticsEventModel: + db: AsyncIOMotorDatabase = get_database() + doc = await db[ANALYTICS_EVENTS_COLLECTION].find_one({"event_id": event_id}, {"_id": 0}) + if not doc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found") + return AnalyticsEventModel(**doc) + + @staticmethod + def _build_query(filters: EventFilters) -> dict: + query: Dict[str, Any] = {} + if filters.merchant_id: + query["merchant_id"] = filters.merchant_id + if filters.event_type: + query["event_type"] = filters.event_type + if filters.source: + query["source"] = filters.source + if filters.entity_type: + query["entity_type"] = filters.entity_type + if filters.entity_id: + query["entity_id"] = filters.entity_id + if filters.user_id: + query["user_id"] = filters.user_id + date_range: Dict[str, Any] = {} + if filters.date_from: + date_range["$gte"] = filters.date_from + if filters.date_to: + date_range["$lte"] = filters.date_to + if date_range: + query["occurred_at"] = date_range + return query + + @staticmethod + async def list_events( + filters: EventFilters, + skip: int = 0, + limit: int = 100, + projection_list: Optional[List[str]] = None, + ): + db: AsyncIOMotorDatabase = get_database() + collection = db[ANALYTICS_EVENTS_COLLECTION] + query = EventService._build_query(filters) + + projection_dict = None + if projection_list: + projection_dict = {field: 1 for field in projection_list} + projection_dict["_id"] = 0 + + cursor = collection.find(query, projection_dict).sort("occurred_at", -1).skip(skip).limit(limit) + docs = await cursor.to_list(length=limit) + return docs if projection_list else [AnalyticsEventModel(**d) for d in docs] + + @staticmethod + async def count_events(filters: EventFilters) -> int: + db: AsyncIOMotorDatabase = get_database() + query = EventService._build_query(filters) + return await db[ANALYTICS_EVENTS_COLLECTION].count_documents(query) diff --git a/app/insightfy_utils-0.1.0-py3-none-any.whl b/app/insightfy_utils-0.1.0-py3-none-any.whl new file mode 100644 index 0000000000000000000000000000000000000000..abff700ec8ac289a1a7bca59e95c6cb81e685d2f Binary files /dev/null and b/app/insightfy_utils-0.1.0-py3-none-any.whl differ diff --git a/app/kpi_cache/__init__.py b/app/kpi_cache/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/kpi_cache/constants.py b/app/kpi_cache/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..da4d795c2e4cd9a03aa4927f14539ea3d6bd3c43 --- /dev/null +++ b/app/kpi_cache/constants.py @@ -0,0 +1,113 @@ +"""Constants for KPI cache module.""" + +# MongoDB collection name for durable KPI snapshots +KPI_CACHE_COLLECTION = "kpi_cache" + +# Redis key prefix for hot L1 cache +REDIS_KPI_PREFIX = "kpi" + +# Redis TTLs (seconds) — L1 hot cache, shorter than MongoDB snapshots +REDIS_TTL_FINANCIAL = 1800 # 30 min +REDIS_TTL_INVENTORY = 600 # 10 min +REDIS_TTL_OPERATIONS = 600 # 10 min +REDIS_TTL_SALES = 900 # 15 min + +# Widget category → Redis TTL mapping +WIDGET_REDIS_TTL: dict = { + "financial": REDIS_TTL_FINANCIAL, + "inventory": REDIS_TTL_INVENTORY, + "operations": REDIS_TTL_OPERATIONS, + "sales": REDIS_TTL_SALES, +} + +# All 12 KPI widget definitions +KPI_WIDGET_REGISTRY: dict = { + # --- OPERATIONS --- + "wid_open_po_count_001": { + "title": "Open Purchase Orders", + "category": "operations", + "unit": "count", + "description": "Count of POs in submitted/approved/dispatched/partially_received status", + "drill_down_url": "/purchases/orders", + }, + "wid_po_aging_001": { + "title": "PO Aging", + "category": "operations", + "unit": "count", + "description": "Open POs grouped by age: 0-7, 8-14, 15-30, 30+ days", + "drill_down_url": "/purchases/orders", + }, + "wid_receipts_this_week_001": { + "title": "Receipts This Week", + "category": "operations", + "unit": "count", + "description": "GRNs received in the last 7 days", + "drill_down_url": "/purchases/receipts", + }, + "wid_stock_ins_today_001": { + "title": "Stock-Ins Today", + "category": "operations", + "unit": "count", + "description": "Direct stock-in (Self-GRN) transactions created today", + "drill_down_url": "/self-grn", + }, + "wid_stock_take_pending_001": { + "title": "Pending Stock Takes", + "category": "operations", + "unit": "count", + "description": "Stock takes in draft or in_progress status", + "drill_down_url": "/inventory/stock-take", + }, + "wid_shipments_transit_001": { + "title": "Shipments In Transit", + "category": "operations", + "unit": "count", + "description": "Trade shipments currently in transit", + "drill_down_url": "/trade-sales/client-orders", + }, + # --- INVENTORY --- + "wid_low_stock_skus_001": { + "title": "Low Stock SKUs", + "category": "inventory", + "unit": "count", + "description": "SKUs at or below reorder point; includes stockout count", + "drill_down_url": "/inventory/stock-overview", + }, + "wid_net_stock_value_001": { + "title": "Net Stock Value", + "category": "inventory", + "unit": "INR", + "description": "Total value of current inventory (qty_on_hand * cost_price)", + "drill_down_url": "/inventory/stock-overview", + }, + "wid_adjustments_mtd_001": { + "title": "Adjustments This Month", + "category": "inventory", + "unit": "count", + "description": "Approved/applied stock adjustments month-to-date", + "drill_down_url": "/inventory/adjustments", + }, + # --- FINANCIAL --- + "wid_invoices_mtd_001": { + "title": "Invoices MTD", + "category": "financial", + "unit": "count", + "description": "Invoices issued month-to-date (non-cancelled, non-draft)", + "drill_down_url": "/trade-sales/invoices", + }, + "wid_credit_debit_notes_mtd_001": { + "title": "Credit / Debit Notes MTD", + "category": "financial", + "unit": "INR", + "description": "Net value of credit and debit notes issued this month", + "drill_down_url": "/trade-sales/credit-debit-notes", + }, + # --- SALES --- + "wid_pos_sales_today_001": { + "title": "POS Sales Today", + "category": "sales", + "unit": "INR", + "description": "Today's POS sales total and transaction count (Phase 2 stub)", + "drill_down_url": "/retail/pos", + }, +} diff --git a/app/kpi_cache/controllers/__init__.py b/app/kpi_cache/controllers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..00ea27d27b602614352eb3184f74280e7470aca2 --- /dev/null +++ b/app/kpi_cache/controllers/__init__.py @@ -0,0 +1 @@ +# kpi_cache controllers package diff --git a/app/kpi_cache/controllers/router.py b/app/kpi_cache/controllers/router.py new file mode 100644 index 0000000000000000000000000000000000000000..0d57fc68a1cba39f4c75c25e0e9f132bfb9758ed --- /dev/null +++ b/app/kpi_cache/controllers/router.py @@ -0,0 +1,182 @@ +""" +KPI Cache router. + POST /kpi-cache/stats — bulk KPI fetch + POST /kpi-cache/stats/individual/{kpi_id} — single KPI (404 on unknown, 403 on no widget access) + POST /kpi-cache/rebuild — force recompute + POST /kpi-cache/list — list cache snapshots (projection supported) + +Auth rules (mirrors SCM-ms widget_router.py): + - merchant_id is ALWAYS sourced from JWT — never from request body + - Bulk stats / rebuild / list: require permissions.dashboard.view + - Individual KPI: additionally checks scm_access_roles.widget_access[] for that widget_id +""" +from datetime import datetime, timezone +from fastapi import APIRouter, Depends, HTTPException, status + +from app.core.logging import get_logger +from app.dependencies.auth import get_current_user, TokenUser +from app.dependencies.kpi_permissions import require_dashboard_view, check_widget_access +from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY +from app.kpi_cache.schemas.schema import ( + KPIStatsRequest, + KPIStatsResponse, + KPIIndividualRequest, + KPIRebuildRequest, + RebuildResponse, + KPICacheListRequest, +) +from app.kpi_cache.services.service import KPICacheService + +logger = get_logger(__name__) + +router = APIRouter(prefix="/kpi-cache", tags=["KPI Cache"]) + + +@router.post("/stats", response_model=KPIStatsResponse) +async def get_kpi_stats( + payload: KPIStatsRequest, + current_user: TokenUser = Depends(require_dashboard_view), +): + """ + Return KPI widget values for the authenticated merchant. + merchant_id is taken from JWT — not from the request body. + Requires: permissions.dashboard.view + Partial failures return {"error": ..., "value": null} per KPI — never HTTP 500. + """ + branch_id = payload.branch_id or "all" + try: + kpis = await KPICacheService.get_kpi_stats( + merchant_id=current_user.merchant_id, + period_window=payload.period_window, + branch_id=branch_id, + kpi_ids=payload.kpi_ids, + use_cache=payload.use_cache, + ) + logger.info( + "KPI stats retrieved", + extra={ + "event": "kpi_stats_retrieved", + "user_id": current_user.user_id, + "merchant_id": current_user.merchant_id, + "period_window": payload.period_window, + "kpi_count": len(kpis), + }, + ) + return KPIStatsResponse( + success=True, + merchant_id=current_user.merchant_id, + period_window=payload.period_window, + branch_id=branch_id, + kpis=kpis, + generated_at=datetime.now(timezone.utc).isoformat(), + ) + except Exception as exc: + logger.error("get_kpi_stats failed", + extra={"event": "kpi_stats_error", "error": str(exc)}, exc_info=True) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to fetch KPI stats") + + +@router.post("/stats/individual/{kpi_id}") +async def get_individual_kpi( + kpi_id: str, + payload: KPIIndividualRequest, + current_user: TokenUser = Depends(require_dashboard_view), +): + """ + Refresh a single KPI widget. + Requires: permissions.dashboard.view + widget_id in scm_access_roles.widget_access[] + Returns HTTP 404 with available_kpis list when kpi_id is unknown. + use_cache=false bypasses cache read but writes fresh result back to Redis. + """ + # 404 on unknown widget_id (exact match, not fuzzy) + if kpi_id not in KPI_HANDLER_REGISTRY: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "error": "KPI not found", + "available_kpis": list(KPI_HANDLER_REGISTRY.keys()), + }, + ) + + # Widget-level access control + await check_widget_access(kpi_id, current_user) + + branch_id = payload.branch_id or "all" + result = await KPICacheService.get_individual_kpi( + widget_id=kpi_id, + merchant_id=current_user.merchant_id, + period_window=payload.period_window, + branch_id=branch_id, + use_cache=payload.use_cache, + ) + logger.info( + "Individual KPI retrieved", + extra={ + "event": "kpi_individual_retrieved", + "kpi_id": kpi_id, + "user_id": current_user.user_id, + "merchant_id": current_user.merchant_id, + }, + ) + return {"success": True, "merchant_id": current_user.merchant_id, "kpi": result} + + +@router.post("/rebuild", response_model=RebuildResponse) +async def rebuild_kpi_cache( + payload: KPIRebuildRequest, + current_user: TokenUser = Depends(require_dashboard_view), +): + """ + Force recompute KPI cache for the authenticated merchant. + Requires: permissions.dashboard.view + """ + try: + result = await KPICacheService.rebuild( + merchant_id=current_user.merchant_id, + period_window=payload.period_window, + branch_id=payload.branch_id, + kpi_ids=payload.kpi_ids, + ) + return RebuildResponse( + success=True, + message="KPI cache rebuilt successfully", + merchant_id=current_user.merchant_id, + rebuilt_count=result["rebuilt"], + failed_count=result["failed"], + details=result.get("errors") or None, + ) + except Exception as exc: + logger.error("rebuild_kpi_cache failed", + extra={"event": "kpi_rebuild_error", "error": str(exc)}, exc_info=True) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to rebuild KPI cache") + + +@router.post("/list") +async def list_kpi_cache( + payload: KPICacheListRequest, + current_user: TokenUser = Depends(require_dashboard_view), +): + """ + List KPI cache snapshots with optional projection_list. + merchant_id filter is always enforced from JWT — cannot be overridden. + Requires: permissions.dashboard.view + """ + filters = payload.filters.model_dump() if payload.filters else {} + # Always scope to the authenticated merchant — ignore any merchant_id in filters + filters["merchant_id"] = current_user.merchant_id + + try: + docs = await KPICacheService.list_cache( + filters=filters, + skip=payload.skip, + limit=payload.limit, + projection_list=payload.projection_list, + ) + return {"success": True, "data": docs, "count": len(docs)} + except Exception as exc: + logger.error("list_kpi_cache failed", + extra={"event": "kpi_list_error", "error": str(exc)}, exc_info=True) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to list KPI cache") diff --git a/app/kpi_cache/models/__init__.py b/app/kpi_cache/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/kpi_cache/models/model.py b/app/kpi_cache/models/model.py new file mode 100644 index 0000000000000000000000000000000000000000..64d93b0245e87ac9d24abdaf32de1832e9af2551 --- /dev/null +++ b/app/kpi_cache/models/model.py @@ -0,0 +1,31 @@ +"""MongoDB document model for KPI cache snapshots.""" +from datetime import datetime +from typing import Optional, Dict, Any +from pydantic import BaseModel, Field + + +class KPICacheDocument(BaseModel): + """ + Durable KPI snapshot stored in MongoDB. + One document per (merchant_id, widget_id, period_window, branch_id). + """ + cache_id: str = Field(..., description="UUID for this cache document") + merchant_id: str = Field(..., description="Tenant identifier") + widget_id: str = Field(..., description="KPI widget identifier") + period_window: str = Field(..., description="today | last_7_days | mtd | ytd | last_12_months") + branch_id: str = Field("all", description="Branch/warehouse filter; 'all' means no filter") + + # KPI payload + value: float = Field(..., description="Primary KPI value") + unit: str = Field(..., description="count | INR | % | ratio") + delta: Optional[float] = Field(None) + delta_percentage: Optional[float] = Field(None) + trend: str = Field("neutral", description="up | down | neutral") + secondary_values: Optional[Dict[str, Any]] = Field(None) + drill_down_url: Optional[str] = Field(None) + + # Metadata + computed_at: datetime = Field(default_factory=datetime.utcnow) + expires_at: Optional[datetime] = Field(None, description="When this snapshot should be rebuilt") + cached: bool = Field(False, description="Always False for freshly written docs") + error: Optional[str] = Field(None, description="Set when computation failed") diff --git a/app/kpi_cache/schemas/__init__.py b/app/kpi_cache/schemas/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/kpi_cache/schemas/schema.py b/app/kpi_cache/schemas/schema.py new file mode 100644 index 0000000000000000000000000000000000000000..9e42482367b9f239eb0ce9a88d2f29f4c23c7563 --- /dev/null +++ b/app/kpi_cache/schemas/schema.py @@ -0,0 +1,79 @@ +"""Pydantic schemas for KPI cache API.""" +from datetime import datetime +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field + + +class KPIStatsRequest(BaseModel): + """Request body for POST /kpi-cache/stats""" + period_window: str = Field("mtd", description="today | last_7_days | mtd | ytd | last_12_months") + branch_id: Optional[str] = Field(None, description="Filter by branch/warehouse") + kpi_ids: Optional[List[str]] = Field(None, description="Subset of widget IDs; omit for all 12") + use_cache: bool = Field(True, description="False forces a live rebuild for this request") + + +class KPIIndividualRequest(BaseModel): + """Request body for POST /kpi-cache/stats/individual/{kpi_id}""" + period_window: str = Field("mtd", description="today | last_7_days | mtd | ytd | last_12_months") + branch_id: Optional[str] = Field(None, description="Filter by branch/warehouse") + use_cache: bool = Field(True, description="False forces a live recompute and cache write-back") + + +class KPIRebuildRequest(BaseModel): + """Request body for POST /kpi-cache/rebuild""" + period_window: Optional[str] = Field(None, description="Rebuild only this period; omit for all") + branch_id: Optional[str] = Field(None, description="Rebuild only this branch; omit for all") + kpi_ids: Optional[List[str]] = Field(None, description="Subset of widget IDs; omit for all 12") + + +class KPICacheFilters(BaseModel): + merchant_id: Optional[str] = Field(None) + widget_id: Optional[str] = Field(None) + period_window: Optional[str] = Field(None) + branch_id: Optional[str] = Field(None) + computed_at_from: Optional[datetime] = Field(None) + computed_at_to: Optional[datetime] = Field(None) + + +class KPICacheListRequest(BaseModel): + """Request body for POST /kpi-cache/list""" + filters: Optional[KPICacheFilters] = Field(default_factory=KPICacheFilters) + skip: int = Field(0, ge=0) + limit: int = Field(100, ge=1, le=1000) + projection_list: Optional[List[str]] = Field( + None, description="List of fields to include in response" + ) + + +class KPIResultSchema(BaseModel): + widget_id: str + title: str + category: str + value: float + unit: str + delta: Optional[float] = None + delta_percentage: Optional[float] = None + trend: str = "neutral" + secondary_values: Optional[Dict[str, Any]] = None + drill_down_url: Optional[str] = None + cached: bool = False + computed_at: Optional[datetime] = None + error: Optional[str] = None + + +class KPIStatsResponse(BaseModel): + success: bool + merchant_id: str + period_window: str + branch_id: str = "all" + kpis: Dict[str, Any] + generated_at: str + + +class RebuildResponse(BaseModel): + success: bool + message: str + merchant_id: str + rebuilt_count: int + failed_count: int + details: Optional[Dict[str, Any]] = None diff --git a/app/kpi_cache/services/__init__.py b/app/kpi_cache/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6acc11ff3548e6efff5ebf5a2a83baebc528ecf1 --- /dev/null +++ b/app/kpi_cache/services/__init__.py @@ -0,0 +1 @@ +# kpi_cache services package diff --git a/app/kpi_cache/services/service.py b/app/kpi_cache/services/service.py new file mode 100644 index 0000000000000000000000000000000000000000..f90072274a2f909b43a0e30d53aeea3f5d6cf38a --- /dev/null +++ b/app/kpi_cache/services/service.py @@ -0,0 +1,232 @@ +""" +KPI Cache Service — registry-based asyncio.gather() dispatch. +Reads from Redis L1 (via KPIHandler.compute) -> MongoDB L2 write-back. +""" +import asyncio +import uuid +from datetime import datetime, timezone, timedelta +from typing import Optional, List, Dict, Any + +from app.core.logging import get_logger +from app.nosql import get_database +from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY +from app.kpi_handlers.base_handler import KPIResult +from app.kpi_cache.constants import ( + KPI_CACHE_COLLECTION, + KPI_WIDGET_REGISTRY, + WIDGET_REDIS_TTL, +) +from app.kpi_cache.schemas.schema import KPIResultSchema + +logger = get_logger(__name__) + +_DEFAULT_TTL = 600 + + +def _ttl_for(widget_id: str) -> int: + meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) + return WIDGET_REDIS_TTL.get(meta.get("category", ""), _DEFAULT_TTL) + + +class KPICacheService: + + @staticmethod + async def get_kpi_stats( + merchant_id: str, + period_window: str, + branch_id: str, + kpi_ids: Optional[List[str]], + use_cache: bool, + ) -> Dict[str, Any]: + """ + Concurrently compute all requested KPIs via asyncio.gather(). + Failing KPIs return {"error": "...", "value": null} — never HTTP 500. + """ + target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys()) + + async def compute_one(wid: str): + handler = KPI_HANDLER_REGISTRY.get(wid) + if not handler: + return wid, {"error": f"KPI not found: {wid}", "value": None} + try: + br = branch_id if branch_id != "all" else None + result: KPIResult = await handler.compute( + merchant_id=merchant_id, + period_window=period_window, + branch_id=br, + use_cache=use_cache, + ) + await KPICacheService._write_to_mongo( + merchant_id, wid, period_window, branch_id, result + ) + return wid, KPICacheService._to_schema(wid, result) + except Exception as exc: + logger.error( + "KPI computation failed", + extra={"event": "kpi_compute_error", "widget_id": wid, + "merchant_id": merchant_id, "error": str(exc)}, + exc_info=True, + ) + return wid, {"error": str(exc), "value": None} + + pairs = await asyncio.gather(*[compute_one(wid) for wid in target_ids]) + return dict(pairs) + + @staticmethod + async def get_individual_kpi( + widget_id: str, + merchant_id: str, + period_window: str, + branch_id: str, + use_cache: bool, + ) -> Optional[KPIResultSchema]: + """ + Compute a single KPI. Returns None when widget_id is unknown (caller raises 404). + use_cache=False bypasses cache read but writes fresh result back to Redis. + """ + handler = KPI_HANDLER_REGISTRY.get(widget_id) + if not handler: + return None + + br = branch_id if branch_id != "all" else None + result: KPIResult = await handler.compute( + merchant_id=merchant_id, + period_window=period_window, + branch_id=br, + use_cache=use_cache, + ) + await KPICacheService._write_to_mongo( + merchant_id, widget_id, period_window, branch_id, result + ) + return KPICacheService._to_schema(widget_id, result) + + @staticmethod + async def rebuild( + merchant_id: str, + period_window: Optional[str], + branch_id: Optional[str], + kpi_ids: Optional[List[str]], + ) -> Dict[str, Any]: + """Force recompute for all/subset of KPIs and update both caches.""" + target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys()) + periods = [period_window] if period_window else ["today", "last_7_days", "mtd", "ytd"] + branches = [branch_id] if branch_id else ["all"] + + rebuilt, failed = 0, 0 + errors: Dict[str, str] = {} + + for pw in periods: + for br in branches: + results = await KPICacheService.get_kpi_stats( + merchant_id=merchant_id, + period_window=pw, + branch_id=br, + kpi_ids=target_ids, + use_cache=False, + ) + for wid, payload in results.items(): + if isinstance(payload, dict) and payload.get("error"): + failed += 1 + errors[f"{wid}:{pw}:{br}"] = payload["error"] + else: + rebuilt += 1 + + return {"rebuilt": rebuilt, "failed": failed, "errors": errors} + + @staticmethod + async def list_cache( + filters: Dict[str, Any], + skip: int, + limit: int, + projection_list: Optional[List[str]], + ) -> List[Any]: + """List KPI cache documents with optional MongoDB projection.""" + db = get_database() + collection = db[KPI_CACHE_COLLECTION] + + query: Dict[str, Any] = {} + for field in ("merchant_id", "widget_id", "period_window", "branch_id"): + if filters.get(field): + query[field] = filters[field] + if filters.get("computed_at_from") or filters.get("computed_at_to"): + date_filter: Dict[str, Any] = {} + if filters.get("computed_at_from"): + date_filter["$gte"] = filters["computed_at_from"] + if filters.get("computed_at_to"): + date_filter["$lte"] = filters["computed_at_to"] + query["computed_at"] = date_filter + + projection_dict = None + if projection_list: + projection_dict = {f: 1 for f in projection_list} + projection_dict["_id"] = 0 + + cursor = collection.find(query, projection_dict).skip(skip).limit(limit) + return await cursor.to_list(length=limit) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + @staticmethod + def _to_schema(widget_id: str, result: KPIResult) -> KPIResultSchema: + meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) + return KPIResultSchema( + widget_id=widget_id, + title=meta.get("title", widget_id), + category=meta.get("category", ""), + value=max(result.value, 0), + unit=result.unit, + delta=result.delta, + delta_percentage=result.delta_percentage, + trend=result.trend, + secondary_values=result.secondary_values, + drill_down_url=result.drill_down_url or meta.get("drill_down_url"), + cached=result.cached, + computed_at=result.computed_at, + error=result.error, + ) + + @staticmethod + async def _write_to_mongo( + merchant_id: str, + widget_id: str, + period_window: str, + branch_id: str, + result: KPIResult, + ) -> None: + try: + db = get_database() + collection = db[KPI_CACHE_COLLECTION] + now = datetime.now(timezone.utc) + ttl_seconds = _ttl_for(widget_id) + meta = KPI_WIDGET_REGISTRY.get(widget_id, {}) + doc = { + "cache_id": str(uuid.uuid4()), + "merchant_id": merchant_id, + "widget_id": widget_id, + "period_window": period_window, + "branch_id": branch_id, + "value": max(result.value, 0), + "unit": result.unit, + "delta": result.delta, + "delta_percentage": result.delta_percentage, + "trend": result.trend, + "secondary_values": result.secondary_values, + "drill_down_url": result.drill_down_url or meta.get("drill_down_url"), + "computed_at": now, + "expires_at": now + timedelta(seconds=ttl_seconds), + "cached": False, + "error": result.error, + } + await collection.update_one( + {"merchant_id": merchant_id, "widget_id": widget_id, + "period_window": period_window, "branch_id": branch_id}, + {"$set": doc}, + upsert=True, + ) + except Exception as exc: + logger.error( + "MongoDB write failed", + extra={"event": "mongo_write_error", "error": str(exc)}, + ) diff --git a/app/kpi_cache/services/sql_queries.py b/app/kpi_cache/services/sql_queries.py new file mode 100644 index 0000000000000000000000000000000000000000..24882db218cf690e3f12fd26eff0030dec8649d1 --- /dev/null +++ b/app/kpi_cache/services/sql_queries.py @@ -0,0 +1,303 @@ +""" +Raw asyncpg SQL queries for each KPI widget. +All queries are scoped to merchant_id as the first predicate. +Returns a dict matching KPICacheDocument fields. +""" +from datetime import datetime, timedelta, timezone +from typing import Optional, Dict, Any + +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +def _utc_today_start() -> datetime: + now = datetime.now(timezone.utc) + return now.replace(hour=0, minute=0, second=0, microsecond=0) + + +def _compute_delta(current: float, prior: float): + delta = current - prior + delta_pct = round((delta / prior) * 100, 2) if prior != 0 else None + trend = "up" if delta > 0 else ("down" if delta < 0 else "neutral") + return delta, delta_pct, trend + + +# --------------------------------------------------------------------------- +# wid_open_po_count_001 +# --------------------------------------------------------------------------- +async def query_open_po_count(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE po_date >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, + COUNT(*) FILTER (WHERE po_date < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count + FROM trans.scm_po + WHERE merchant_id = $1 + AND status IN ('submitted','approved','dispatched','partially_received') + {branch_clause} + """, *params) + current = float(row["current_count"] or 0) + prior = float(row["prior_count"] or 0) + delta, delta_pct, trend = _compute_delta(current, prior) + return {"value": max(current, 0), "unit": "count", "delta": delta, + "delta_percentage": delta_pct, "trend": trend} + + +# --------------------------------------------------------------------------- +# wid_po_aging_001 +# --------------------------------------------------------------------------- +async def query_po_aging(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7) AS b0_7, + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14) AS b8_14, + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30) AS b15_30, + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date > 30) AS b30_plus, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7), 0) AS v0_7, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14), 0) AS v8_14, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30), 0) AS v15_30, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date > 30), 0) AS v30_plus + FROM trans.scm_po + WHERE merchant_id = $1 + AND status IN ('submitted','approved','dispatched','partially_received') + {branch_clause} + """, *params) + b = {k: int(row[k] or 0) for k in ("b0_7", "b8_14", "b15_30", "b30_plus")} + total = sum(b.values()) + secondary = { + "bucket_0_7": {"count": b["b0_7"], "value": float(row["v0_7"] or 0)}, + "bucket_8_14": {"count": b["b8_14"], "value": float(row["v8_14"] or 0)}, + "bucket_15_30": {"count": b["b15_30"], "value": float(row["v15_30"] or 0)}, + "bucket_30_plus":{"count": b["b30_plus"],"value": float(row["v30_plus"] or 0)}, + } + return {"value": float(total), "unit": "count", "secondary_values": secondary} + + +# --------------------------------------------------------------------------- +# wid_receipts_this_week_001 +# --------------------------------------------------------------------------- +async def query_receipts_this_week(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + branch_clause = "AND wh_location = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, + COALESCE(SUM(total_qty) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days'), 0) AS total_qty, + COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '14 days' + AND recv_dt < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count + FROM trans.scm_grn + WHERE merchant_id = $1 + {branch_clause} + """, *params) + current = float(row["current_count"] or 0) + prior = float(row["prior_count"] or 0) + delta, delta_pct, trend = _compute_delta(current, prior) + return {"value": max(current, 0), "unit": "count", "delta": delta, + "delta_percentage": delta_pct, "trend": trend, + "secondary_values": {"total_received_qty": float(row["total_qty"] or 0)}} + + +# --------------------------------------------------------------------------- +# wid_stock_ins_today_001 (MongoDB Self-GRN — stub returning 0 until cross-service) +# --------------------------------------------------------------------------- +async def query_stock_ins_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + # Self-GRN lives in MongoDB managed by SCM-ms; analytics-ms returns a stub + # until a shared collection or internal API is available. + logger.warning("stock_ins_today: MongoDB Self-GRN not accessible from analytics-ms; returning stub", + extra={"event": "kpi_stub", "widget_id": "wid_stock_ins_today_001", + "merchant_id": merchant_id}) + return {"value": 0.0, "unit": "count", "delta": 0.0, "delta_percentage": None, + "trend": "neutral", "secondary_values": {"stub": True}} + + +# --------------------------------------------------------------------------- +# wid_low_stock_skus_001 +# --------------------------------------------------------------------------- +async def query_low_stock_skus(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + branch_clause = "AND warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + row = await conn.fetchrow(f""" + SELECT + COUNT(DISTINCT sku) FILTER (WHERE qty_available = 0) AS stockout_count, + COUNT(DISTINCT sku) FILTER (WHERE qty_available <= 0) AS low_stock_count + FROM trans.scm_stock + WHERE merchant_id = $1 + {branch_clause} + """, *params) + stockout = int(row["stockout_count"] or 0) + low = int(row["low_stock_count"] or 0) + logger.warning("low_stock_skus: reorder_point not on scm_stock; using qty_available=0 fallback", + extra={"event": "kpi_fallback", "merchant_id": merchant_id}) + return {"value": float(low), "unit": "count", + "secondary_values": {"stockout_count": stockout, "low_stock_count": low}} + + +# --------------------------------------------------------------------------- +# wid_net_stock_value_001 +# --------------------------------------------------------------------------- +async def query_net_stock_value(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + branch_clause = "AND warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + row = await conn.fetchrow(f""" + SELECT COALESCE(SUM(qty_on_hand * cost_price), 0) AS net_value + FROM trans.scm_stock + WHERE merchant_id = $1 + AND cost_price IS NOT NULL + {branch_clause} + """, *params) + value = float(row["net_value"] or 0) + if value < 0: + logger.warning("net_stock_value: negative aggregate clamped to 0", + extra={"event": "kpi_clamp", "raw": value, "merchant_id": merchant_id}) + value = 0.0 + return {"value": value, "unit": "INR", "delta": None, "delta_percentage": None, "trend": "neutral"} + + +# --------------------------------------------------------------------------- +# wid_adjustments_mtd_001 +# --------------------------------------------------------------------------- +async def query_adjustments_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + branch_clause = "AND m.warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + row = await conn.fetchrow(f""" + SELECT + COUNT(DISTINCT m.adjustment_master_id) AS adj_count, + COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'IN'), 0) AS pos_value, + COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'OUT'), 0) AS neg_value + FROM trans.scm_stock_adjustment_master m + JOIN trans.scm_stock_adjustment_details d + ON d.adjustment_master_id = m.adjustment_master_id + WHERE m.merchant_id = $1 + AND m.status IN ('approved','applied') + AND m.adjustment_date >= DATE_TRUNC('month', CURRENT_TIMESTAMP) + {branch_clause} + """, *params) + pos = float(row["pos_value"] or 0) + neg = float(row["neg_value"] or 0) + return {"value": float(row["adj_count"] or 0), "unit": "count", + "secondary_values": {"positive_value": pos, "negative_value": neg, + "net_impact": pos - neg}} + + +# --------------------------------------------------------------------------- +# wid_stock_take_pending_001 +# --------------------------------------------------------------------------- +async def query_stock_take_pending(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + branch_clause = "AND warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE status IN ('draft','in_progress')) AS pending_count, + COUNT(*) FILTER (WHERE status IN ('draft','in_progress') + AND stock_take_date < CURRENT_TIMESTAMP) AS overdue_count + FROM trans.scm_stock_take_master + WHERE merchant_id = $1 + {branch_clause} + """, *params) + pending = int(row["pending_count"] or 0) + overdue = int(row["overdue_count"] or 0) + return {"value": float(pending), "unit": "count", + "secondary_values": {"pending_count": pending, "overdue_count": overdue}} + + +# --------------------------------------------------------------------------- +# wid_shipments_transit_001 +# --------------------------------------------------------------------------- +async def query_shipments_transit(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + row = await conn.fetchrow(""" + SELECT + COUNT(*) AS in_transit, + COUNT(*) FILTER (WHERE po.exp_delivery_dt < CURRENT_DATE) AS delayed + FROM trans.scm_trade_shipment ts + JOIN trans.scm_po po ON ts.order_id = po.po_id + WHERE ts.supplier_id = $1 + AND ts.status = 'shipped' + """, merchant_id) + in_transit = int(row["in_transit"] or 0) + delayed = int(row["delayed"] or 0) + delayed_pct = round((delayed / in_transit) * 100, 2) if in_transit > 0 else 0.0 + return {"value": float(in_transit), "unit": "count", + "secondary_values": {"in_transit_count": in_transit, "delayed_count": delayed, + "delayed_percentage": delayed_pct}} + + +# --------------------------------------------------------------------------- +# wid_invoices_mtd_001 +# --------------------------------------------------------------------------- +async def query_invoices_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + row = await conn.fetchrow(""" + SELECT + COUNT(*) AS inv_count, + COALESCE(SUM(grand_total_amt), 0) AS total_amt, + COUNT(*) FILTER ( + WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') + AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) + ) AS prior_count, + COALESCE(SUM(grand_total_amt) FILTER ( + WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') + AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) + ), 0) AS prior_amt + FROM trans.scm_invoice + WHERE (buyer_id = $1 OR supplier_id = $1) + AND status NOT IN ('cancelled','draft') + AND created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP) + """, merchant_id) + current = float(row["inv_count"] or 0) + total = float(row["total_amt"] or 0) + if total < 0: + logger.warning("invoices_mtd: negative total clamped", + extra={"event": "kpi_clamp", "raw": total, "merchant_id": merchant_id}) + total = 0.0 + prior = float(row["prior_count"] or 0) + delta, delta_pct, trend = _compute_delta(current, prior) + return {"value": max(current, 0), "unit": "count", "delta": delta, + "delta_percentage": delta_pct, "trend": trend, + "secondary_values": {"total_invoiced_amount": total}} + + +# --------------------------------------------------------------------------- +# wid_credit_debit_notes_mtd_001 (Phase 1 stub) +# --------------------------------------------------------------------------- +async def query_credit_debit_notes_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + logger.warning("credit_debit_notes_mtd: no dedicated table yet; returning stub", + extra={"event": "kpi_stub", "widget_id": "wid_credit_debit_notes_mtd_001", + "merchant_id": merchant_id}) + return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None, + "trend": "neutral", + "secondary_values": {"credit_note_total": 0.0, "debit_note_total": 0.0, + "net_impact": 0.0, "stub": True}} + + +# --------------------------------------------------------------------------- +# wid_pos_sales_today_001 (Phase 2 stub) +# --------------------------------------------------------------------------- +async def query_pos_sales_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: + logger.warning("pos_sales_today: Phase 2 stub", + extra={"event": "kpi_stub", "widget_id": "wid_pos_sales_today_001", + "merchant_id": merchant_id}) + return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None, + "trend": "neutral", + "secondary_values": {"transaction_count": 0, "average_ticket": 0.0, "stub": True}} + + +# --------------------------------------------------------------------------- +# Dispatch table +# --------------------------------------------------------------------------- +QUERY_DISPATCH = { + "wid_open_po_count_001": query_open_po_count, + "wid_po_aging_001": query_po_aging, + "wid_receipts_this_week_001": query_receipts_this_week, + "wid_stock_ins_today_001": query_stock_ins_today, + "wid_low_stock_skus_001": query_low_stock_skus, + "wid_net_stock_value_001": query_net_stock_value, + "wid_adjustments_mtd_001": query_adjustments_mtd, + "wid_stock_take_pending_001": query_stock_take_pending, + "wid_shipments_transit_001": query_shipments_transit, + "wid_invoices_mtd_001": query_invoices_mtd, + "wid_credit_debit_notes_mtd_001": query_credit_debit_notes_mtd, + "wid_pos_sales_today_001": query_pos_sales_today, +} diff --git a/app/kpi_handlers/__init__.py b/app/kpi_handlers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..64cd86c3f17f7c770e09d88390cb0dd9384d0cfe --- /dev/null +++ b/app/kpi_handlers/__init__.py @@ -0,0 +1,3 @@ +from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY + +__all__ = ["KPI_HANDLER_REGISTRY"] diff --git a/app/kpi_handlers/adjustment_handlers.py b/app/kpi_handlers/adjustment_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..316ef5e02d3cc56e0dddb4955969fdc8df36640f --- /dev/null +++ b/app/kpi_handlers/adjustment_handlers.py @@ -0,0 +1,42 @@ +"""Adjustment KPI handler: AdjustmentsMTDHandler.""" +from typing import Optional + +from app.kpi_handlers.base_handler import KPIHandler, KPIResult +from app.postgres import get_postgres_pool + + +class AdjustmentsMTDHandler(KPIHandler): + widget_id = "wid_adjustments_mtd_001" + drill_down_url = "/inventory/adjustments" + cache_ttl = 600 + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + branch_clause = "AND m.warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(f""" + SELECT + COUNT(DISTINCT m.adjustment_master_id) AS adj_count, + COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'IN'), 0) AS pos_value, + COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'OUT'), 0) AS neg_value + FROM trans.scm_stock_adjustment_master m + JOIN trans.scm_stock_adjustment_details d + ON d.adjustment_master_id = m.adjustment_master_id + WHERE m.merchant_id = $1 + AND m.status IN ('approved','applied') + AND m.adjustment_date >= DATE_TRUNC('month', CURRENT_TIMESTAMP) + {branch_clause} + """, *params) + pos = float(row["pos_value"] or 0) + neg = float(row["neg_value"] or 0) + return KPIResult( + value=float(row["adj_count"] or 0), + unit="count", + secondary_values={ + "positive_value": pos, + "negative_value": neg, + "net_impact": pos - neg, + }, + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/base_handler.py b/app/kpi_handlers/base_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..b3ed52d2dcffd79cb376a59a6f37c5982e1577fd --- /dev/null +++ b/app/kpi_handlers/base_handler.py @@ -0,0 +1,158 @@ +""" +Base abstractions for KPI handlers. +Each KPI is a self-contained KPIHandler subclass that implements compute(). +""" +import json +from abc import ABC, abstractmethod +from datetime import datetime, timezone +from typing import Optional, Tuple, Dict, Any + +from pydantic import BaseModel + +from app.cache import get_redis +from app.core.logging import get_logger +from app.postgres import get_postgres_pool + +logger = get_logger(__name__) + + +class KPIResult(BaseModel): + value: float + unit: str = "count" + delta: Optional[float] = None + delta_percentage: Optional[float] = None + trend: str = "neutral" + secondary_values: Optional[Dict[str, Any]] = None + drill_down_url: Optional[str] = None + cached: bool = False + computed_at: Optional[datetime] = None + error: Optional[str] = None + + +def get_period_filter(period_window: str, column: str = "created_at") -> str: + """Return a SQL fragment for the given period window.""" + mapping = { + "today": f"{column}::date = CURRENT_DATE", + "last_7_days": f"{column} >= CURRENT_TIMESTAMP - INTERVAL '7 days'", + "mtd": f"{column} >= DATE_TRUNC('month', CURRENT_TIMESTAMP)", + "ytd": f"{column} >= DATE_TRUNC('year', CURRENT_TIMESTAMP)", + "last_12_months": f"{column} >= CURRENT_TIMESTAMP - INTERVAL '12 months'", + } + return mapping.get(period_window, mapping["mtd"]) + + +def compute_delta(current: float, prior: float) -> Tuple[float, Optional[float], str]: + """ + Returns (delta, delta_pct, trend). + delta_pct is None when prior == 0. + """ + delta = current - prior + delta_pct = round((delta / prior) * 100, 2) if prior != 0 else None + trend = "up" if delta > 0 else ("down" if delta < 0 else "neutral") + return delta, delta_pct, trend + + +class KPIHandler(ABC): + """Abstract base for all KPI handlers.""" + + widget_id: str + drill_down_url: Optional[str] = None + cache_ttl: int = 600 # seconds + + async def compute( + self, + merchant_id: str, + period_window: str, + branch_id: Optional[str], + use_cache: bool = True, + ) -> KPIResult: + """ + Cache check → DB query → cache write. + Redis unavailability falls back silently to DB. + """ + import time + start = time.monotonic() + cache_key = self._cache_key(merchant_id, period_window, branch_id) + + if use_cache: + cached_result = await self._read_cache(cache_key) + if cached_result is not None: + duration_ms = round((time.monotonic() - start) * 1000, 2) + logger.info( + "KPI served from cache", + extra={ + "event": "kpi_computed", + "widget_id": self.widget_id, + "merchant_id": merchant_id, + "cached": True, + "duration_ms": duration_ms, + }, + ) + cached_result.cached = True + return cached_result + + result = await self._execute(merchant_id, period_window, branch_id) + result.cached = False + result.computed_at = datetime.now(timezone.utc) + if result.drill_down_url is None: + result.drill_down_url = self.drill_down_url + + await self._write_cache(cache_key, result) + + duration_ms = round((time.monotonic() - start) * 1000, 2) + logger.info( + "KPI computed", + extra={ + "event": "kpi_computed", + "widget_id": self.widget_id, + "merchant_id": merchant_id, + "cached": False, + "duration_ms": duration_ms, + }, + ) + return result + + @abstractmethod + async def _execute( + self, + merchant_id: str, + period_window: str, + branch_id: Optional[str], + ) -> KPIResult: + """Run the actual DB query and return a KPIResult.""" + ... + + def _cache_key(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> str: + br = branch_id or "all" + return f"widget:{self.widget_id}:{merchant_id}:{period_window}:{br}" + + async def _read_cache(self, key: str) -> Optional[KPIResult]: + redis = get_redis() + if not redis: + return None + try: + raw = await redis.get(key) + if raw: + return KPIResult(**json.loads(raw)) + except Exception as exc: + logger.warning( + "widget_cache_unavailable", + extra={"event": "widget_cache_unavailable", "error": str(exc)}, + ) + return None + + async def _write_cache(self, key: str, result: KPIResult) -> None: + redis = get_redis() + if not redis: + return + try: + await redis.setex(key, self.cache_ttl, result.model_dump_json()) + except Exception as exc: + logger.warning( + "widget_cache_unavailable", + extra={"event": "widget_cache_unavailable", "error": str(exc)}, + ) + + async def _get_conn(self): + """Acquire a connection from the asyncpg pool.""" + return get_postgres_pool().acquire() diff --git a/app/kpi_handlers/finance_handlers.py b/app/kpi_handlers/finance_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..e8b5f2452b6bbb953f452a88edecd389180f4f35 --- /dev/null +++ b/app/kpi_handlers/finance_handlers.py @@ -0,0 +1,82 @@ +"""Finance KPI handlers: InvoicesMTDHandler, CreditDebitNotesMTDHandler.""" +from typing import Optional + +from app.core.logging import get_logger +from app.kpi_handlers.base_handler import KPIHandler, KPIResult, compute_delta +from app.postgres import get_postgres_pool + +logger = get_logger(__name__) + + +class InvoicesMTDHandler(KPIHandler): + widget_id = "wid_invoices_mtd_001" + drill_down_url = "/trade-sales/invoices" + cache_ttl = 1800 # CACHE_TTL_FINANCIAL + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT + COUNT(*) FILTER ( + WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP) + ) AS inv_count, + COALESCE(SUM(grand_total_amt) FILTER ( + WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP) + ), 0) AS total_amt, + COUNT(*) FILTER ( + WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') + AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) + ) AS prior_count, + COALESCE(SUM(grand_total_amt) FILTER ( + WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') + AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) + ), 0) AS prior_amt + FROM trans.scm_invoice + WHERE (buyer_id = $1 OR supplier_id = $1) + AND status NOT IN ('cancelled','draft') + """, merchant_id) + current = float(row["inv_count"] or 0) + total = float(row["total_amt"] or 0) + if total < 0: + logger.warning( + "invoices_mtd: negative total clamped to 0", + extra={"event": "kpi_clamp", "raw": total, "merchant_id": merchant_id, + "widget_id": self.widget_id}, + ) + total = 0.0 + prior = float(row["prior_count"] or 0) + delta, delta_pct, trend = compute_delta(current, prior) + return KPIResult( + value=max(current, 0), + unit="count", + delta=delta, + delta_percentage=delta_pct, + trend=trend, + secondary_values={"total_invoiced_amount": total}, + drill_down_url=self.drill_down_url, + ) + + +class CreditDebitNotesMTDHandler(KPIHandler): + """Phase 1 stub — no dedicated table yet.""" + widget_id = "wid_credit_debit_notes_mtd_001" + drill_down_url = "/trade-sales/credit-debit-notes" + cache_ttl = 1800 # CACHE_TTL_FINANCIAL + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + logger.warning( + "credit_debit_notes_mtd: no dedicated table yet; returning stub", + extra={"event": "widget_credit_debit_stub", "widget_id": self.widget_id, + "merchant_id": merchant_id}, + ) + return KPIResult( + value=0.0, + unit="INR", + delta=0.0, + delta_percentage=None, + trend="neutral", + secondary_values={"credit_note_total": 0.0, "debit_note_total": 0.0, + "net_impact": 0.0, "stub": True}, + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/grn_handlers.py b/app/kpi_handlers/grn_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..bac25e7ffb85f1207a8497f7c6dca635facfe26d --- /dev/null +++ b/app/kpi_handlers/grn_handlers.py @@ -0,0 +1,41 @@ +"""GRN KPI handler: ReceiptsThisWeekHandler.""" +from typing import Optional + +from app.kpi_handlers.base_handler import KPIHandler, KPIResult, compute_delta +from app.postgres import get_postgres_pool + + +class ReceiptsThisWeekHandler(KPIHandler): + widget_id = "wid_receipts_this_week_001" + drill_down_url = "/purchases/receipts" + cache_ttl = 600 + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + branch_clause = "AND wh_location = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, + COALESCE(SUM(total_qty) FILTER ( + WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days'), 0) AS total_qty, + COUNT(*) FILTER ( + WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '14 days' + AND recv_dt < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count + FROM trans.scm_grn + WHERE merchant_id = $1 + {branch_clause} + """, *params) + current = float(row["current_count"] or 0) + prior = float(row["prior_count"] or 0) + delta, delta_pct, trend = compute_delta(current, prior) + return KPIResult( + value=max(current, 0), + unit="count", + delta=delta, + delta_percentage=delta_pct, + trend=trend, + secondary_values={"total_received_qty": float(row["total_qty"] or 0)}, + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/inventory_handlers.py b/app/kpi_handlers/inventory_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..f6d7138348c0abefd5c692ab5163c214937468a9 --- /dev/null +++ b/app/kpi_handlers/inventory_handlers.py @@ -0,0 +1,77 @@ +"""Inventory KPI handlers: LowStockSKUsHandler, NetStockValueHandler.""" +from typing import Optional + +from app.core.logging import get_logger +from app.kpi_handlers.base_handler import KPIHandler, KPIResult +from app.postgres import get_postgres_pool + +logger = get_logger(__name__) + + +class LowStockSKUsHandler(KPIHandler): + widget_id = "wid_low_stock_skus_001" + drill_down_url = "/inventory/stock-overview" + cache_ttl = 600 + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + branch_clause = "AND warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(f""" + SELECT + COUNT(DISTINCT sku) FILTER (WHERE qty_available = 0) AS stockout_count, + COUNT(DISTINCT sku) FILTER (WHERE qty_available <= 0) AS low_stock_count + FROM trans.scm_stock + WHERE merchant_id = $1 + {branch_clause} + """, *params) + stockout = int(row["stockout_count"] or 0) + low = int(row["low_stock_count"] or 0) + # reorder_point not on scm_stock — using qty_available=0 fallback + logger.warning( + "low_stock_skus: reorder_point not on scm_stock; using qty_available=0 fallback", + extra={"event": "kpi_fallback", "widget_id": self.widget_id, "merchant_id": merchant_id}, + ) + return KPIResult( + value=float(low), + unit="count", + secondary_values={"stockout_count": stockout, "low_stock_count": low}, + drill_down_url=self.drill_down_url, + ) + + +class NetStockValueHandler(KPIHandler): + widget_id = "wid_net_stock_value_001" + drill_down_url = "/inventory/stock-overview" + cache_ttl = 1800 # CACHE_TTL_FINANCIAL + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + branch_clause = "AND warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(f""" + SELECT COALESCE(SUM(qty_on_hand * cost_price), 0) AS net_value + FROM trans.scm_stock + WHERE merchant_id = $1 + AND cost_price IS NOT NULL + {branch_clause} + """, *params) + value = float(row["net_value"] or 0) + if value < 0: + logger.warning( + "net_stock_value: negative aggregate clamped to 0", + extra={"event": "kpi_clamp", "raw": value, "merchant_id": merchant_id, + "widget_id": self.widget_id}, + ) + value = 0.0 + # 7-day delta via ledger is best-effort; return null if insufficient data + return KPIResult( + value=value, + unit="INR", + delta=None, + delta_percentage=None, + trend="neutral", + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/po_handlers.py b/app/kpi_handlers/po_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..ed8553b782e6591cd9f0b381c23f6504f9398c82 --- /dev/null +++ b/app/kpi_handlers/po_handlers.py @@ -0,0 +1,78 @@ +"""PO-related KPI handlers: OpenPOCountHandler, POAgingHandler.""" +from typing import Optional + +from app.kpi_handlers.base_handler import KPIHandler, KPIResult, compute_delta +from app.postgres import get_postgres_pool + + +class OpenPOCountHandler(KPIHandler): + widget_id = "wid_open_po_count_001" + drill_down_url = "/purchases/orders" + cache_ttl = 600 + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE po_date >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, + COUNT(*) FILTER (WHERE po_date < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count + FROM trans.scm_po + WHERE merchant_id = $1 + AND status IN ('submitted','approved','dispatched','partially_received') + {branch_clause} + """, *params) + current = float(row["current_count"] or 0) + prior = float(row["prior_count"] or 0) + delta, delta_pct, trend = compute_delta(current, prior) + return KPIResult( + value=max(current, 0), + unit="count", + delta=delta, + delta_percentage=delta_pct, + trend=trend, + drill_down_url=self.drill_down_url, + ) + + +class POAgingHandler(KPIHandler): + widget_id = "wid_po_aging_001" + drill_down_url = "/purchases/orders" + cache_ttl = 600 + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7) AS b0_7, + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14) AS b8_14, + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30) AS b15_30, + COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date > 30) AS b30_plus, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7), 0) AS v0_7, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14), 0) AS v8_14, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30), 0) AS v15_30, + COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date > 30), 0) AS v30_plus + FROM trans.scm_po + WHERE merchant_id = $1 + AND status IN ('submitted','approved','dispatched','partially_received') + {branch_clause} + """, *params) + b = {k: int(row[k] or 0) for k in ("b0_7", "b8_14", "b15_30", "b30_plus")} + total = sum(b.values()) + secondary = { + "bucket_0_7": {"count": b["b0_7"], "value": float(row["v0_7"] or 0)}, + "bucket_8_14": {"count": b["b8_14"], "value": float(row["v8_14"] or 0)}, + "bucket_15_30": {"count": b["b15_30"], "value": float(row["v15_30"] or 0)}, + "bucket_30_plus":{"count": b["b30_plus"],"value": float(row["v30_plus"] or 0)}, + } + return KPIResult( + value=float(total), + unit="count", + secondary_values=secondary, + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/pos_handlers.py b/app/kpi_handlers/pos_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..d9fe7774cc656fe417eb9ff5a8e548212d1cf69d --- /dev/null +++ b/app/kpi_handlers/pos_handlers.py @@ -0,0 +1,33 @@ +"""POS KPI handler: POSSalesTodayHandler (Phase 2 stub).""" +from typing import Optional + +from app.core.logging import get_logger +from app.kpi_handlers.base_handler import KPIHandler, KPIResult + +logger = get_logger(__name__) + + +class POSSalesTodayHandler(KPIHandler): + """ + Phase 2 stub — POS cross-service API contract not yet defined. + Returns zeros so the frontend card renders without errors. + """ + widget_id = "wid_pos_sales_today_001" + drill_down_url = "/retail/pos" + cache_ttl = 900 # CACHE_TTL_SALES + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + logger.warning( + "pos_sales_today: Phase 2 stub — POS cross-service API not yet available", + extra={"event": "widget_pos_stub", "widget_id": self.widget_id, + "merchant_id": merchant_id}, + ) + return KPIResult( + value=0.0, + unit="INR", + delta=0.0, + delta_percentage=None, + trend="neutral", + secondary_values={"transaction_count": 0, "average_ticket": 0.0, "stub": True}, + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/registry.py b/app/kpi_handlers/registry.py new file mode 100644 index 0000000000000000000000000000000000000000..2afb5ba6173319061f69db004ee894a9fa55ba54 --- /dev/null +++ b/app/kpi_handlers/registry.py @@ -0,0 +1,28 @@ +""" +KPI_HANDLER_REGISTRY — maps all 12 widget IDs to singleton handler instances. +Import this dict to dispatch KPI computations. +""" +from app.kpi_handlers.po_handlers import OpenPOCountHandler, POAgingHandler +from app.kpi_handlers.grn_handlers import ReceiptsThisWeekHandler +from app.kpi_handlers.stock_in_handlers import StockInsTodayHandler +from app.kpi_handlers.inventory_handlers import LowStockSKUsHandler, NetStockValueHandler +from app.kpi_handlers.adjustment_handlers import AdjustmentsMTDHandler +from app.kpi_handlers.stock_take_handlers import StockTakePendingHandler +from app.kpi_handlers.shipment_handlers import ShipmentsInTransitHandler +from app.kpi_handlers.finance_handlers import InvoicesMTDHandler, CreditDebitNotesMTDHandler +from app.kpi_handlers.pos_handlers import POSSalesTodayHandler + +KPI_HANDLER_REGISTRY = { + "wid_open_po_count_001": OpenPOCountHandler(), + "wid_po_aging_001": POAgingHandler(), + "wid_receipts_this_week_001": ReceiptsThisWeekHandler(), + "wid_stock_ins_today_001": StockInsTodayHandler(), + "wid_low_stock_skus_001": LowStockSKUsHandler(), + "wid_net_stock_value_001": NetStockValueHandler(), + "wid_adjustments_mtd_001": AdjustmentsMTDHandler(), + "wid_stock_take_pending_001": StockTakePendingHandler(), + "wid_shipments_transit_001": ShipmentsInTransitHandler(), + "wid_invoices_mtd_001": InvoicesMTDHandler(), + "wid_credit_debit_notes_mtd_001": CreditDebitNotesMTDHandler(), + "wid_pos_sales_today_001": POSSalesTodayHandler(), +} diff --git a/app/kpi_handlers/shipment_handlers.py b/app/kpi_handlers/shipment_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..7b81e4781dcc70b350a79c4b3dae416972208c31 --- /dev/null +++ b/app/kpi_handlers/shipment_handlers.py @@ -0,0 +1,37 @@ +"""Shipment KPI handler: ShipmentsInTransitHandler.""" +from typing import Optional + +from app.kpi_handlers.base_handler import KPIHandler, KPIResult +from app.postgres import get_postgres_pool + + +class ShipmentsInTransitHandler(KPIHandler): + widget_id = "wid_shipments_transit_001" + drill_down_url = "/trade-sales/client-orders" + cache_ttl = 900 # CACHE_TTL_SALES + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT + COUNT(*) AS in_transit, + COUNT(*) FILTER (WHERE po.exp_delivery_dt < CURRENT_DATE) AS delayed + FROM trans.scm_trade_shipment ts + JOIN trans.scm_po po ON ts.order_id = po.po_id + WHERE ts.supplier_id = $1 + AND ts.status = 'shipped' + """, merchant_id) + in_transit = int(row["in_transit"] or 0) + delayed = int(row["delayed"] or 0) + delayed_pct = round((delayed / in_transit) * 100, 2) if in_transit > 0 else 0.0 + return KPIResult( + value=float(in_transit), + unit="count", + secondary_values={ + "in_transit_count": in_transit, + "delayed_count": delayed, + "delayed_percentage": delayed_pct, + }, + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/stock_in_handlers.py b/app/kpi_handlers/stock_in_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..507d9e80b16367885c4af89ff8cc3c0be3e0df08 --- /dev/null +++ b/app/kpi_handlers/stock_in_handlers.py @@ -0,0 +1,37 @@ +"""Stock-In KPI handler: StockInsTodayHandler (MongoDB Self-GRN stub).""" +from typing import Optional + +from app.core.logging import get_logger +from app.kpi_handlers.base_handler import KPIHandler, KPIResult + +logger = get_logger(__name__) + + +class StockInsTodayHandler(KPIHandler): + """ + Queries MongoDB Self-GRN collection. + Phase 1 stub: Self-GRN lives in SCM-ms MongoDB; returns zeros until + a shared collection or internal API contract is established. + """ + widget_id = "wid_stock_ins_today_001" + drill_down_url = "/self-grn" + cache_ttl = 600 + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + logger.warning( + "stock_ins_today: MongoDB Self-GRN not accessible from analytics-ms; returning stub", + extra={ + "event": "widget_stock_ins_stub", + "widget_id": self.widget_id, + "merchant_id": merchant_id, + }, + ) + return KPIResult( + value=0.0, + unit="count", + delta=0.0, + delta_percentage=None, + trend="neutral", + secondary_values={"stub": True}, + drill_down_url=self.drill_down_url, + ) diff --git a/app/kpi_handlers/stock_take_handlers.py b/app/kpi_handlers/stock_take_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..3d50195f57edb7472136233966d66e19ea55fbdc --- /dev/null +++ b/app/kpi_handlers/stock_take_handlers.py @@ -0,0 +1,35 @@ +"""Stock-take KPI handler: StockTakePendingHandler.""" +from typing import Optional + +from app.kpi_handlers.base_handler import KPIHandler, KPIResult +from app.postgres import get_postgres_pool + + +class StockTakePendingHandler(KPIHandler): + widget_id = "wid_stock_take_pending_001" + drill_down_url = "/inventory/stock-take" + cache_ttl = 600 + + async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult: + branch_clause = "AND warehouse_id = $2" if branch_id else "" + params = [merchant_id] + ([branch_id] if branch_id else []) + pool = get_postgres_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow(f""" + SELECT + COUNT(*) FILTER (WHERE status IN ('draft','in_progress')) AS pending_count, + COUNT(*) FILTER ( + WHERE status IN ('draft','in_progress') + AND stock_take_date < CURRENT_TIMESTAMP) AS overdue_count + FROM trans.scm_stock_take_master + WHERE merchant_id = $1 + {branch_clause} + """, *params) + pending = int(row["pending_count"] or 0) + overdue = int(row["overdue_count"] or 0) + return KPIResult( + value=float(pending), + unit="count", + secondary_values={"pending_count": pending, "overdue_count": overdue}, + drill_down_url=self.drill_down_url, + ) diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000000000000000000000000000000000000..ef00d22c0a785f2c92360740ee054bffcfa557d4 --- /dev/null +++ b/app/main.py @@ -0,0 +1,149 @@ +""" +Main FastAPI application for Analytics Microservice. +""" +import os +import time +from contextlib import asynccontextmanager + +from fastapi import FastAPI, Request, status +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from fastapi.exceptions import RequestValidationError +from jose import JWTError +from pymongo.errors import PyMongoError + +from app.core.config import settings +from app.core.logging import get_logger, setup_logging +from app.nosql import connect_to_mongo, close_mongo_connection +from app.cache import connect_to_redis, close_redis_connection +from app.postgres import connect_to_postgres, close_postgres_connection + +from app.events.controllers.router import router as events_router +from app.reports.controllers.router import router as reports_router +from app.dashboard.controllers.router import router as dashboard_router +from app.kpi_cache.controllers.router import router as kpi_cache_router + +# Init logging +setup_logging(level=settings.LOG_LEVEL.strip().upper()) +logger = get_logger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("Starting Analytics Microservice", extra={"event": "service_starting"}) + await connect_to_mongo() + await connect_to_redis() + await connect_to_postgres() + # Ensure MongoDB indexes for kpi_cache + from app.nosql import get_database + from app.kpi_cache.constants import KPI_CACHE_COLLECTION + db = get_database() + kpi_col = db[KPI_CACHE_COLLECTION] + await kpi_col.create_index( + [("merchant_id", 1), ("widget_id", 1), ("period_window", 1), ("branch_id", 1)], + unique=True, background=True, name="kpi_cache_natural_key" + ) + await kpi_col.create_index([("merchant_id", 1)], background=True, name="kpi_cache_merchant_id") + await kpi_col.create_index([("expires_at", 1)], background=True, name="kpi_cache_expires_at") + logger.info("Analytics Microservice started", extra={"event": "service_ready"}) + yield + logger.info("Shutting down Analytics Microservice", extra={"event": "service_stopping"}) + await close_postgres_connection() + await close_mongo_connection() + await close_redis_connection() + logger.info("Analytics Microservice stopped", extra={"event": "service_stopped"}) + + +app = FastAPI( + title="Analytics Microservice", + description="Centralised analytics, event ingestion, metrics aggregation and reporting.", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc", + root_path=os.getenv("ROOT_PATH", ""), + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=settings.CORS_ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + expose_headers=["*"], +) + + +@app.middleware("http") +async def log_requests(request: Request, call_next): + start = time.time() + response = await call_next(request) + duration_ms = round((time.time() - start) * 1000, 2) + logger.info( + "HTTP request", + extra={ + "event": "http_request", + "method": request.method, + "path": request.url.path, + "status_code": response.status_code, + "duration_ms": duration_ms, + "client_ip": request.client.host if request.client else None, + }, + ) + return response + + +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + logger.warning("Validation error", extra={"event": "validation_error", "errors": exc.errors()}) + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content={"success": False, "detail": exc.errors()}, + ) + + +@app.exception_handler(JWTError) +async def jwt_exception_handler(request: Request, exc: JWTError): + return JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"success": False, "detail": "Invalid or expired token"}, + ) + + +@app.exception_handler(PyMongoError) +async def mongo_exception_handler(request: Request, exc: PyMongoError): + logger.error("MongoDB error", extra={"event": "mongo_error", "error": str(exc)}, exc_info=True) + return JSONResponse( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + content={"success": False, "detail": "Database error"}, + ) + + +@app.exception_handler(Exception) +async def generic_exception_handler(request: Request, exc: Exception): + logger.error("Unhandled exception", extra={"event": "unhandled_exception", "error": str(exc)}, exc_info=True) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"success": False, "detail": "Internal server error"}, + ) + + +# Routers +app.include_router(events_router) +app.include_router(reports_router) +app.include_router(dashboard_router) +app.include_router(kpi_cache_router) + + +@app.get("/health", tags=["health"]) +async def health_check(): + return {"status": "healthy", "service": "analytics-microservice", "version": app.version} + + +@app.get("/debug/routes", tags=["debug"]) +async def list_routes(): + return [ + {"path": r.path, "methods": list(r.methods) if r.methods else [], "name": r.name} + for r in app.routes + if hasattr(r, "path") and hasattr(r, "methods") + ] diff --git a/app/nosql.py b/app/nosql.py new file mode 100644 index 0000000000000000000000000000000000000000..9eddaa6ffd97eb138fb7d0acc78c02579f5a2709 --- /dev/null +++ b/app/nosql.py @@ -0,0 +1,52 @@ +""" +MongoDB connection management for Analytics Microservice. +""" +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase +from app.core.logging import get_logger +from app.core.config import settings + +logger = get_logger(__name__) + + +class DatabaseConnection: + _client: AsyncIOMotorClient = None + _db: AsyncIOMotorDatabase = None + + @classmethod + def get_database(cls) -> AsyncIOMotorDatabase: + if cls._db is None: + raise RuntimeError("Database not connected. Call connect_to_mongo() first.") + return cls._db + + @classmethod + async def connect(cls): + try: + mongodb_uri = settings.MONGODB_URI.strip() + logger.info("Connecting to MongoDB", extra={"uri": mongodb_uri, "database": settings.MONGODB_DB_NAME}) + cls._client = AsyncIOMotorClient(mongodb_uri, uuidRepresentation="standard") + cls._db = cls._client[settings.MONGODB_DB_NAME] + await cls._db.command("ping") + logger.info("MongoDB connected successfully", extra={"event": "mongo_connected"}) + except Exception as e: + logger.error("Failed to connect to MongoDB", extra={"event": "mongo_connect_failure", "error": str(e)}, exc_info=True) + raise + + @classmethod + async def close(cls): + if cls._client: + cls._client.close() + cls._client = None + cls._db = None + logger.info("MongoDB connection closed", extra={"event": "mongo_disconnected"}) + + +async def connect_to_mongo(): + await DatabaseConnection.connect() + + +async def close_mongo_connection(): + await DatabaseConnection.close() + + +def get_database() -> AsyncIOMotorDatabase: + return DatabaseConnection.get_database() diff --git a/app/postgres.py b/app/postgres.py new file mode 100644 index 0000000000000000000000000000000000000000..e658621b52cb0e5bef50fca80b6b2b136c5d0947 --- /dev/null +++ b/app/postgres.py @@ -0,0 +1,76 @@ +""" +PostgreSQL connection pool management for Analytics Microservice. +Adapted from cuatrolabs-scm-ms/app/postgres.py. +""" +import ssl +import asyncpg +from typing import Optional +from app.core.logging import get_logger +from app.core.config import settings + +logger = get_logger(__name__) + + +class PostgreSQLConnectionPool: + _pool: Optional[asyncpg.Pool] = None + + @classmethod + async def initialize(cls) -> None: + if cls._pool is not None: + logger.warning("PostgreSQL pool already initialized") + return + try: + ssl_context = None + mode = (settings.POSTGRES_SSL_MODE or "disable").lower() + if mode != "disable": + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + cls._pool = await asyncpg.create_pool( + host=settings.POSTGRES_HOST, + port=settings.POSTGRES_PORT, + database=settings.POSTGRES_DB, + user=settings.POSTGRES_USER, + password=settings.POSTGRES_PASSWORD, + min_size=settings.POSTGRES_MIN_POOL_SIZE, + max_size=settings.POSTGRES_MAX_POOL_SIZE, + command_timeout=30.0, + timeout=30.0, + ssl=ssl_context, + ) + async with cls._pool.acquire() as conn: + await conn.fetchval("SELECT 1") + logger.info("PostgreSQL pool initialized", extra={ + "event": "postgres_connected", + "host": settings.POSTGRES_HOST, + "db": settings.POSTGRES_DB, + }) + except Exception as e: + logger.error("Failed to initialize PostgreSQL pool", exc_info=e) + raise + + @classmethod + async def close(cls) -> None: + if cls._pool: + await cls._pool.close() + cls._pool = None + logger.info("PostgreSQL pool closed", extra={"event": "postgres_disconnected"}) + + @classmethod + def get_pool(cls) -> asyncpg.Pool: + if cls._pool is None: + raise RuntimeError("PostgreSQL pool not initialized. Call connect_to_postgres() first.") + return cls._pool + + +async def connect_to_postgres() -> None: + await PostgreSQLConnectionPool.initialize() + + +async def close_postgres_connection() -> None: + await PostgreSQLConnectionPool.close() + + +def get_postgres_pool() -> asyncpg.Pool: + return PostgreSQLConnectionPool.get_pool() diff --git a/app/reports/__init__.py b/app/reports/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/reports/constants.py b/app/reports/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..a7b501e19e5492c3850c63b160a9cb779fe11e54 --- /dev/null +++ b/app/reports/constants.py @@ -0,0 +1,23 @@ +"""Constants for analytics reports module.""" + +ANALYTICS_REPORTS_COLLECTION = "analytics_reports" + + +class ReportStatus: + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + + +class ReportType: + SALES_SUMMARY = "sales_summary" + ORDER_FUNNEL = "order_funnel" + USER_ACTIVITY = "user_activity" + INVENTORY_MOVEMENT = "inventory_movement" + MERCHANT_PERFORMANCE = "merchant_performance" + CUSTOM = "custom" + + @classmethod + def values(cls): + return [v for k, v in vars(cls).items() if not k.startswith("_") and isinstance(v, str)] diff --git a/app/reports/controllers/__init__.py b/app/reports/controllers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/reports/controllers/router.py b/app/reports/controllers/router.py new file mode 100644 index 0000000000000000000000000000000000000000..e9b6513f0e1bde7f95090aa23b70072dd8e7f9a5 --- /dev/null +++ b/app/reports/controllers/router.py @@ -0,0 +1,43 @@ +"""Analytics reports API router.""" +from fastapi import APIRouter, Depends, status + +from app.core.logging import get_logger +from app.dependencies.auth import get_current_user, TokenUser +from app.reports.schemas.schema import ReportCreate, ReportResponse, ReportListRequest, StatusResponse +from app.reports.services.service import ReportService + +logger = get_logger(__name__) + +router = APIRouter(prefix="/reports", tags=["reports"]) + + +@router.post("", response_model=StatusResponse, status_code=status.HTTP_201_CREATED, summary="Create a report") +async def create_report( + payload: ReportCreate, + current_user: TokenUser = Depends(get_current_user), +): + report_id = await ReportService.create_report(payload, created_by=current_user.user_id) + return StatusResponse(success=True, message="Report created", report_id=report_id) + + +@router.get("/{report_id}", response_model=ReportResponse, summary="Get report by ID") +async def get_report( + report_id: str, + current_user: TokenUser = Depends(get_current_user), +): + return await ReportService.get_report(report_id) + + +@router.post("/list", summary="List reports with filters and optional projection") +async def list_reports( + payload: ReportListRequest, + current_user: TokenUser = Depends(get_current_user), +): + results = await ReportService.list_reports( + filters=payload.filters, + skip=payload.skip, + limit=payload.limit, + projection_list=payload.projection_list, + ) + total = await ReportService.count_reports(payload.filters) + return {"success": True, "total": total, "skip": payload.skip, "limit": payload.limit, "data": results} diff --git a/app/reports/models/__init__.py b/app/reports/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/reports/models/model.py b/app/reports/models/model.py new file mode 100644 index 0000000000000000000000000000000000000000..cfc040816d186ef1986659ed9cde251dbc61e772 --- /dev/null +++ b/app/reports/models/model.py @@ -0,0 +1,24 @@ +"""Analytics report document model.""" +from datetime import datetime +from typing import Optional, Dict, Any, List +from pydantic import BaseModel, Field + +from app.reports.constants import ReportStatus, ReportType + + +class ReportModel(BaseModel): + report_id: str = Field(..., description="Unique report identifier (UUID)") + report_type: str = Field(..., description="Type of report") + merchant_id: str = Field(..., description="Merchant this report belongs to") + name: str = Field(..., description="Human-readable report name") + description: Optional[str] = Field(None) + parameters: Optional[Dict[str, Any]] = Field(None, description="Report generation parameters") + date_from: Optional[datetime] = Field(None) + date_to: Optional[datetime] = Field(None) + status: str = Field(default=ReportStatus.PENDING) + result: Optional[Dict[str, Any]] = Field(None, description="Report result data") + error: Optional[str] = Field(None, description="Error message if failed") + created_by: Optional[str] = Field(None) + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: Optional[datetime] = Field(None) + completed_at: Optional[datetime] = Field(None) diff --git a/app/reports/schemas/__init__.py b/app/reports/schemas/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/reports/schemas/schema.py b/app/reports/schemas/schema.py new file mode 100644 index 0000000000000000000000000000000000000000..47e243cb87950f79001b2b4128a6dbe195e07d54 --- /dev/null +++ b/app/reports/schemas/schema.py @@ -0,0 +1,56 @@ +"""Pydantic schemas for analytics reports.""" +from datetime import datetime +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field + +from app.reports.constants import ReportStatus, ReportType + + +class ReportCreate(BaseModel): + report_type: str = Field(..., description="Type of report") + merchant_id: str = Field(..., description="Merchant this report belongs to") + name: str = Field(..., min_length=1, max_length=200) + description: Optional[str] = Field(None) + parameters: Optional[Dict[str, Any]] = Field(None) + date_from: Optional[datetime] = Field(None) + date_to: Optional[datetime] = Field(None) + + +class ReportResponse(BaseModel): + report_id: str + report_type: str + merchant_id: str + name: str + description: Optional[str] = None + parameters: Optional[Dict[str, Any]] = None + date_from: Optional[datetime] = None + date_to: Optional[datetime] = None + status: str + result: Optional[Dict[str, Any]] = None + error: Optional[str] = None + created_by: Optional[str] = None + created_at: datetime + updated_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + +class ReportFilters(BaseModel): + merchant_id: Optional[str] = None + report_type: Optional[str] = None + status: Optional[str] = None + created_by: Optional[str] = None + date_from: Optional[datetime] = None + date_to: Optional[datetime] = None + + +class ReportListRequest(BaseModel): + filters: Optional[ReportFilters] = Field(default_factory=ReportFilters) + skip: int = Field(0, ge=0) + limit: int = Field(100, ge=1, le=500) + projection_list: Optional[List[str]] = Field(None, description="List of fields to include in response") + + +class StatusResponse(BaseModel): + success: bool + message: str + report_id: Optional[str] = None diff --git a/app/reports/services/__init__.py b/app/reports/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/reports/services/service.py b/app/reports/services/service.py new file mode 100644 index 0000000000000000000000000000000000000000..fa11d606ae55b320399e95b1f963c93d02626e35 --- /dev/null +++ b/app/reports/services/service.py @@ -0,0 +1,113 @@ +"""Analytics reports service layer.""" +import uuid +from datetime import datetime +from typing import Optional, List, Dict, Any + +from fastapi import HTTPException, status +from motor.motor_asyncio import AsyncIOMotorDatabase + +from app.core.logging import get_logger +from app.reports.constants import ANALYTICS_REPORTS_COLLECTION, ReportStatus +from app.reports.models.model import ReportModel +from app.reports.schemas.schema import ReportCreate, ReportFilters +from app.nosql import get_database + +logger = get_logger(__name__) + + +class ReportService: + + @staticmethod + async def create_report(payload: ReportCreate, created_by: str) -> str: + db: AsyncIOMotorDatabase = get_database() + report_id = str(uuid.uuid4()) + doc = ReportModel( + report_id=report_id, + report_type=payload.report_type, + merchant_id=payload.merchant_id, + name=payload.name, + description=payload.description, + parameters=payload.parameters, + date_from=payload.date_from, + date_to=payload.date_to, + status=ReportStatus.PENDING, + created_by=created_by, + ) + await db[ANALYTICS_REPORTS_COLLECTION].insert_one(doc.model_dump()) + logger.info("Report created", extra={"event": "report_created", "report_id": report_id}) + return report_id + + @staticmethod + async def get_report(report_id: str) -> ReportModel: + db: AsyncIOMotorDatabase = get_database() + doc = await db[ANALYTICS_REPORTS_COLLECTION].find_one({"report_id": report_id}, {"_id": 0}) + if not doc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Report not found") + return ReportModel(**doc) + + @staticmethod + def _build_query(filters: ReportFilters) -> dict: + query: Dict[str, Any] = {} + if filters.merchant_id: + query["merchant_id"] = filters.merchant_id + if filters.report_type: + query["report_type"] = filters.report_type + if filters.status: + query["status"] = filters.status + if filters.created_by: + query["created_by"] = filters.created_by + date_range: Dict[str, Any] = {} + if filters.date_from: + date_range["$gte"] = filters.date_from + if filters.date_to: + date_range["$lte"] = filters.date_to + if date_range: + query["created_at"] = date_range + return query + + @staticmethod + async def list_reports( + filters: ReportFilters, + skip: int = 0, + limit: int = 100, + projection_list: Optional[List[str]] = None, + ): + db: AsyncIOMotorDatabase = get_database() + collection = db[ANALYTICS_REPORTS_COLLECTION] + query = ReportService._build_query(filters) + + projection_dict = None + if projection_list: + projection_dict = {field: 1 for field in projection_list} + projection_dict["_id"] = 0 + + cursor = collection.find(query, projection_dict).sort("created_at", -1).skip(skip).limit(limit) + docs = await cursor.to_list(length=limit) + return docs if projection_list else [ReportModel(**d) for d in docs] + + @staticmethod + async def count_reports(filters: ReportFilters) -> int: + db: AsyncIOMotorDatabase = get_database() + query = ReportService._build_query(filters) + return await db[ANALYTICS_REPORTS_COLLECTION].count_documents(query) + + @staticmethod + async def update_report_status( + report_id: str, + new_status: str, + result: Optional[Dict[str, Any]] = None, + error: Optional[str] = None, + ) -> None: + db: AsyncIOMotorDatabase = get_database() + update: Dict[str, Any] = {"status": new_status, "updated_at": datetime.utcnow()} + if result is not None: + update["result"] = result + if error is not None: + update["error"] = error + if new_status == ReportStatus.COMPLETED: + update["completed_at"] = datetime.utcnow() + res = await db[ANALYTICS_REPORTS_COLLECTION].update_one( + {"report_id": report_id}, {"$set": update} + ) + if res.matched_count == 0: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Report not found") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..bf09d4f99fff976eb3f84d0f38e624e18bd6d5a9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,26 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +python-multipart==0.0.6 + +motor==3.3.2 +pymongo==4.6.0 +asyncpg==0.31.0 +sqlalchemy[asyncio]==2.0.36 + +redis==5.0.1 +insightfy-utils>=0.1.0 +python-dateutil==2.9.0.post0 + +python-jose[cryptography]==3.3.0 +passlib[bcrypt]==1.7.4 +bcrypt==4.1.3 +pydantic>=2.12.5,<3.0.0 +pydantic-settings>=2.0.0 + +pytest==7.4.3 +pytest-asyncio==0.21.1 +httpx==0.25.2 +hypothesis==6.112.2 + +python-dotenv==1.0.0 +python-json-logger==2.0.7 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/test_properties_kpi_widgets.py b/tests/test_properties_kpi_widgets.py new file mode 100644 index 0000000000000000000000000000000000000000..391f4f8b924996a8c98b470cbf24718a32f69bcc --- /dev/null +++ b/tests/test_properties_kpi_widgets.py @@ -0,0 +1,357 @@ +""" +Property-based tests for KPI widget handlers. +Uses Hypothesis to validate invariants across arbitrary inputs. + +Properties tested: + 1. Aging bucket exhaustiveness (POAgingHandler) + 2. Stockout is a subset of low stock (LowStockSKUsHandler) + 3. Delta consistency — delta = current - prior (compute_delta) + 4. Delta percentage formula (compute_delta) + 5. Trend classification (compute_delta) + 6. Non-negativity of all count and value fields + 7. Cache coherence — two calls with same inputs return equivalent results + 8. Merchant isolation — distinct merchant_ids produce independent results + 9. Branch filter produces a subset (filtered ≤ unfiltered for count KPIs) + 10. Partial failure isolation in bulk requests + 11. Net impact arithmetic (AdjustmentsMTDHandler, CreditDebitNotesMTDHandler) + 12. Period window correctness — records outside window must not appear + 13. KPI response structure completeness + 14. POS stub returns zeros +""" +import asyncio +import json +from typing import Optional +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from hypothesis import given, settings, assume +from hypothesis import strategies as st + +from app.kpi_handlers.base_handler import KPIResult, compute_delta + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_result(**kwargs) -> KPIResult: + defaults = dict(value=0.0, unit="count", trend="neutral") + defaults.update(kwargs) + return KPIResult(**defaults) + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +# --------------------------------------------------------------------------- +# Property 3: Delta Consistency — delta = current - prior +# --------------------------------------------------------------------------- + +@given( + current=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), + prior=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), +) +def test_delta_consistency(current, prior): + """delta must equal current - prior exactly.""" + delta, _, _ = compute_delta(current, prior) + assert abs(delta - (current - prior)) < 1e-9 + + +# --------------------------------------------------------------------------- +# Property 4: Delta Percentage Formula +# --------------------------------------------------------------------------- + +@given( + current=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), + prior=st.floats(min_value=0.001, max_value=1e9, allow_nan=False, allow_infinity=False), +) +def test_delta_percentage_formula(current, prior): + """When prior > 0, delta_pct = round((delta / prior) * 100, 2).""" + delta, delta_pct, _ = compute_delta(current, prior) + expected = round((delta / prior) * 100, 2) + assert delta_pct == expected + + +@given( + current=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), +) +def test_delta_percentage_null_when_prior_zero(current): + """When prior == 0, delta_pct must be None.""" + _, delta_pct, _ = compute_delta(current, 0.0) + assert delta_pct is None + + +# --------------------------------------------------------------------------- +# Property 5: Trend Classification +# --------------------------------------------------------------------------- + +@given( + current=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), + prior=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), +) +def test_trend_classification(current, prior): + """trend must be 'up', 'down', or 'neutral' based on delta sign.""" + delta, _, trend = compute_delta(current, prior) + if delta > 0: + assert trend == "up" + elif delta < 0: + assert trend == "down" + else: + assert trend == "neutral" + + +# --------------------------------------------------------------------------- +# Property 1: Aging Bucket Exhaustiveness +# --------------------------------------------------------------------------- + +@given( + b0_7=st.integers(min_value=0, max_value=1000), + b8_14=st.integers(min_value=0, max_value=1000), + b15_30=st.integers(min_value=0, max_value=1000), + b30_plus=st.integers(min_value=0, max_value=1000), +) +def test_aging_bucket_exhaustiveness(b0_7, b8_14, b15_30, b30_plus): + """ + bucket_0_7 + bucket_8_14 + bucket_15_30 + bucket_30_plus == total open PO count. + Simulates the secondary_values structure returned by POAgingHandler. + """ + total = b0_7 + b8_14 + b15_30 + b30_plus + secondary = { + "bucket_0_7": {"count": b0_7}, + "bucket_8_14": {"count": b8_14}, + "bucket_15_30": {"count": b15_30}, + "bucket_30_plus":{"count": b30_plus}, + } + bucket_sum = sum(v["count"] for v in secondary.values()) + assert bucket_sum == total + + +# --------------------------------------------------------------------------- +# Property 2: Stockout Is a Subset of Low Stock +# --------------------------------------------------------------------------- + +@given( + stockout=st.integers(min_value=0, max_value=5000), + extra_low=st.integers(min_value=0, max_value=5000), +) +def test_stockout_subset_of_low_stock(stockout, extra_low): + """stockout_count <= low_stock_count always.""" + low_stock = stockout + extra_low # low stock includes stockouts + result = _make_result( + value=float(low_stock), + secondary_values={"stockout_count": stockout, "low_stock_count": low_stock}, + ) + assert result.secondary_values["stockout_count"] <= result.secondary_values["low_stock_count"] + + +# --------------------------------------------------------------------------- +# Property 6: Non-Negativity of Count and Value Fields +# --------------------------------------------------------------------------- + +@given(raw_value=st.floats(min_value=-1e6, max_value=1e9, allow_nan=False, allow_infinity=False)) +def test_value_non_negativity_after_clamp(raw_value): + """KPIResult.value clamped to >= 0 in service layer.""" + clamped = max(raw_value, 0) + assert clamped >= 0 + + +@given( + credit=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), + debit=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), +) +def test_credit_debit_non_negative(credit, debit): + """credit_note_total and debit_note_total must both be >= 0.""" + assert credit >= 0 + assert debit >= 0 + + +# --------------------------------------------------------------------------- +# Property 11: Net Impact Arithmetic +# --------------------------------------------------------------------------- + +@given( + pos=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), + neg=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), +) +def test_net_impact_arithmetic(pos, neg): + """net_impact = positive_value - negative_value.""" + net = pos - neg + secondary = {"positive_value": pos, "negative_value": neg, "net_impact": net} + assert abs(secondary["net_impact"] - (secondary["positive_value"] - secondary["negative_value"])) < 1e-9 + + +@given( + credit=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), + debit=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), +) +def test_credit_debit_net_impact_arithmetic(credit, debit): + """net_impact = credit_note_total - debit_note_total.""" + net = credit - debit + secondary = {"credit_note_total": credit, "debit_note_total": debit, "net_impact": net} + assert abs(secondary["net_impact"] - (credit - debit)) < 1e-9 + + +# --------------------------------------------------------------------------- +# Property 13: KPI Response Structure Completeness +# --------------------------------------------------------------------------- + +REQUIRED_FIELDS = {"value", "unit", "trend", "cached"} + + +@given( + value=st.floats(min_value=0, max_value=1e9, allow_nan=False, allow_infinity=False), + unit=st.sampled_from(["count", "INR", "%"]), + trend=st.sampled_from(["up", "down", "neutral"]), + cached=st.booleans(), +) +def test_kpi_result_structure_completeness(value, unit, trend, cached): + """All required fields must be present in every KPIResult.""" + result = KPIResult(value=value, unit=unit, trend=trend, cached=cached) + result_dict = result.model_dump() + for field in REQUIRED_FIELDS: + assert field in result_dict, f"Missing required field: {field}" + + +# --------------------------------------------------------------------------- +# Property 14: POS Stub Returns Zeros +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_pos_stub_returns_zeros(): + """POSSalesTodayHandler must return value=0 and stub=True.""" + from app.kpi_handlers.pos_handlers import POSSalesTodayHandler + handler = POSSalesTodayHandler() + result = await handler._execute("merchant_123", "today", None) + assert result.value == 0.0 + assert result.secondary_values is not None + assert result.secondary_values.get("stub") is True + assert result.secondary_values.get("transaction_count") == 0 + assert result.secondary_values.get("average_ticket") == 0.0 + + +# --------------------------------------------------------------------------- +# Property 7: Cache Coherence — two calls return equivalent results +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_cache_coherence(): + """ + Calling compute() twice with the same inputs returns semantically equivalent results. + Second call should be served from cache (cached=True). + """ + from app.kpi_handlers.pos_handlers import POSSalesTodayHandler + + stored: dict = {} + + handler = POSSalesTodayHandler() + + # Build a mock redis whose get/setex share the same in-memory store + mock_redis = MagicMock() + mock_redis.get = AsyncMock(side_effect=lambda key: stored.get(key)) + mock_redis.setex = AsyncMock(side_effect=lambda key, ttl, value: stored.update({key: value})) + + with patch("app.kpi_handlers.base_handler.get_redis", return_value=mock_redis): + result1 = await handler.compute("m1", "today", None, use_cache=True) + result2 = await handler.compute("m1", "today", None, use_cache=True) + + assert result1.value == result2.value + assert result1.unit == result2.unit + assert result2.cached is True + + +# --------------------------------------------------------------------------- +# Property 8: Merchant Isolation +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_merchant_isolation(): + """ + Two distinct merchant_ids produce independent cache entries. + """ + from app.kpi_handlers.pos_handlers import POSSalesTodayHandler + + store: dict = {} + + mock_redis = MagicMock() + mock_redis.get = AsyncMock(side_effect=lambda key: store.get(key)) + mock_redis.setex = AsyncMock(side_effect=lambda key, ttl, value: store.update({key: value})) + + handler = POSSalesTodayHandler() + + with patch("app.kpi_handlers.base_handler.get_redis", return_value=mock_redis): + await handler.compute("merchant_A", "today", None, use_cache=True) + await handler.compute("merchant_B", "today", None, use_cache=True) + + keys = list(store.keys()) + assert len(keys) == 2 + assert any("merchant_A" in k for k in keys) + assert any("merchant_B" in k for k in keys) + assert keys[0] != keys[1] + + +# --------------------------------------------------------------------------- +# Property 10: Partial Failure Isolation in Bulk Requests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_partial_failure_isolation(): + """ + When one handler raises, get_kpi_stats returns that KPI as + {"error": ..., "value": null} without failing the entire request. + """ + from app.kpi_cache.services.service import KPICacheService + from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY + + # Patch one handler to raise + original = KPI_HANDLER_REGISTRY["wid_pos_sales_today_001"] + failing_handler = AsyncMock() + failing_handler.compute = AsyncMock(side_effect=RuntimeError("simulated failure")) + + with patch.dict(KPI_HANDLER_REGISTRY, {"wid_pos_sales_today_001": failing_handler}): + with patch.object(KPICacheService, "_write_to_mongo", new=AsyncMock()): + results = await KPICacheService.get_kpi_stats( + merchant_id="m1", + period_window="today", + branch_id="all", + kpi_ids=["wid_pos_sales_today_001"], + use_cache=False, + ) + + assert "wid_pos_sales_today_001" in results + entry = results["wid_pos_sales_today_001"] + assert isinstance(entry, dict) + assert entry.get("value") is None + assert "error" in entry + + +# --------------------------------------------------------------------------- +# Property 9: Branch Filter Produces a Subset +# --------------------------------------------------------------------------- + +@given( + total=st.integers(min_value=0, max_value=10000), + branch_subset=st.integers(min_value=0, max_value=10000), +) +def test_branch_filter_subset(total, branch_subset): + """ + Filtered count (branch_id provided) must be <= unfiltered count. + Simulates the invariant: filtering can only reduce or equal the total. + """ + assume(branch_subset <= total) + assert branch_subset <= total + + +# --------------------------------------------------------------------------- +# Property 12: Period Window Correctness (structural) +# --------------------------------------------------------------------------- + +@given(period=st.sampled_from(["today", "last_7_days", "mtd", "ytd", "last_12_months"])) +def test_period_filter_returns_sql_fragment(period): + """get_period_filter must return a non-empty SQL string for all valid periods.""" + from app.kpi_handlers.base_handler import get_period_filter + fragment = get_period_filter(period) + assert isinstance(fragment, str) + assert len(fragment) > 0 + # Must reference a column comparison + assert ">=" in fragment or "=" in fragment