MukeshKapoor25 commited on
Commit
b143975
·
0 Parent(s):

Initial commit

Browse files
This view is limited to 50 files because it contains too many changes.   See raw diff
Files changed (50) hide show
  1. .env.example +42 -0
  2. .gitignore +13 -0
  3. Dockerfile +26 -0
  4. app/__init__.py +0 -0
  5. app/cache.py +49 -0
  6. app/core/__init__.py +0 -0
  7. app/core/config.py +70 -0
  8. app/core/logging.py +119 -0
  9. app/dashboard/__init__.py +0 -0
  10. app/dashboard/constants.py +25 -0
  11. app/dashboard/controllers/__init__.py +0 -0
  12. app/dashboard/controllers/router.py +47 -0
  13. app/dashboard/models/__init__.py +0 -0
  14. app/dashboard/models/model.py +18 -0
  15. app/dashboard/schemas/__init__.py +0 -0
  16. app/dashboard/schemas/schema.py +57 -0
  17. app/dashboard/services/__init__.py +0 -0
  18. app/dashboard/services/service.py +121 -0
  19. app/dependencies/__init__.py +0 -0
  20. app/dependencies/auth.py +60 -0
  21. app/dependencies/kpi_permissions.py +136 -0
  22. app/events/__init__.py +0 -0
  23. app/events/constants.py +21 -0
  24. app/events/controllers/__init__.py +0 -0
  25. app/events/controllers/router.py +43 -0
  26. app/events/models/__init__.py +0 -0
  27. app/events/models/model.py +19 -0
  28. app/events/schemas/__init__.py +0 -0
  29. app/events/schemas/schema.py +57 -0
  30. app/events/services/__init__.py +0 -0
  31. app/events/services/service.py +98 -0
  32. app/insightfy_utils-0.1.0-py3-none-any.whl +0 -0
  33. app/kpi_cache/__init__.py +0 -0
  34. app/kpi_cache/constants.py +113 -0
  35. app/kpi_cache/controllers/__init__.py +1 -0
  36. app/kpi_cache/controllers/router.py +182 -0
  37. app/kpi_cache/models/__init__.py +0 -0
  38. app/kpi_cache/models/model.py +31 -0
  39. app/kpi_cache/schemas/__init__.py +0 -0
  40. app/kpi_cache/schemas/schema.py +79 -0
  41. app/kpi_cache/services/__init__.py +1 -0
  42. app/kpi_cache/services/service.py +232 -0
  43. app/kpi_cache/services/sql_queries.py +303 -0
  44. app/kpi_handlers/__init__.py +3 -0
  45. app/kpi_handlers/adjustment_handlers.py +42 -0
  46. app/kpi_handlers/base_handler.py +158 -0
  47. app/kpi_handlers/finance_handlers.py +82 -0
  48. app/kpi_handlers/grn_handlers.py +41 -0
  49. app/kpi_handlers/inventory_handlers.py +77 -0
  50. app/kpi_handlers/po_handlers.py +78 -0
.env.example ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Application Configuration
2
+ APP_NAME=Analytics Microservice
3
+ APP_VERSION=1.0.0
4
+ DEBUG=false
5
+
6
+ # MongoDB Configuration
7
+ MONGODB_URI=mongodb://localhost:27017
8
+ MONGODB_DB_NAME=cuatrolabs
9
+
10
+ # PostgreSQL Configuration
11
+ DB_PROTOCOL=postgresql+asyncpg
12
+ DB_USER=postgres
13
+ DB_PASSWORD=your-db-password
14
+ DB_HOST=localhost
15
+ DB_PORT=5432
16
+ DB_NAME=cuatrolabs
17
+ DB_SSLMODE=disable
18
+ DATABASE_URL=postgresql+asyncpg://postgres:your-db-password@localhost:5432/cuatrolabs
19
+
20
+ # Redis Configuration
21
+ REDIS_HOST=localhost
22
+ REDIS_PORT=6379
23
+ REDIS_PASSWORD=your-redis-password
24
+ REDIS_DB=0
25
+
26
+ # JWT Configuration
27
+ SECRET_KEY=your-secret-key-here-change-in-production
28
+ ALGORITHM=HS256
29
+ TOKEN_EXPIRATION_HOURS=8
30
+
31
+ # Logging Configuration
32
+ LOG_LEVEL=INFO
33
+ LOG_FORMAT=json
34
+ LOG_DIR=logs
35
+ LOG_MAX_BYTES=52428800
36
+ LOG_BACKUP_COUNT=10
37
+
38
+ # CORS Settings
39
+ CORS_ORIGINS=["http://localhost:3000","http://localhost:8000"]
40
+
41
+ # Root path (for reverse proxy / k8s ingress)
42
+ ROOT_PATH=
.gitignore ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ .env
2
+ __pycache__/
3
+ *.py[cod]
4
+ *.egg-info/
5
+ .venv/
6
+ venv/
7
+ dist/
8
+ build/
9
+ .pytest_cache/
10
+ .hypothesis/
11
+ logs/
12
+ *.log
13
+ .DS_Store
Dockerfile ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.11-slim-bullseye AS base
2
+
3
+ ENV PYTHONUNBUFFERED=1 \
4
+ PYTHONDONTWRITEBYTECODE=1 \
5
+ PATH="/home/user/.local/bin:$PATH"
6
+
7
+ RUN apt-get update && apt-get install -y \
8
+ openssl \
9
+ ca-certificates \
10
+ && rm -rf /var/lib/apt/lists/*
11
+
12
+ RUN useradd -m -u 1000 user
13
+ USER user
14
+ WORKDIR /app
15
+
16
+ COPY --chown=user ./requirements.txt requirements.txt
17
+ COPY --chown=user ./app/insightfy_utils-0.1.0-py3-none-any.whl insightfy_utils-0.1.0-py3-none-any.whl
18
+ RUN pip install --no-cache-dir --upgrade pip && \
19
+ pip install --no-cache-dir insightfy_utils-0.1.0-py3-none-any.whl && \
20
+ pip install --no-cache-dir --upgrade -r requirements.txt
21
+
22
+ COPY --chown=user . /app
23
+
24
+ EXPOSE 7860
25
+
26
+ CMD ["sh", "-c", "uvicorn app.main:app --host 0.0.0.0 --port ${PORT:-7860} --workers 4 --log-level info"]
app/__init__.py ADDED
File without changes
app/cache.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Redis connection management for Analytics Microservice.
3
+ """
4
+ import redis.asyncio as redis
5
+ from app.core.logging import get_logger
6
+ from app.core.config import settings
7
+
8
+ logger = get_logger(__name__)
9
+
10
+ redis_client: redis.Redis = None
11
+
12
+
13
+ async def connect_to_redis():
14
+ global redis_client
15
+ try:
16
+ pool_params = {
17
+ "host": settings.REDIS_HOST,
18
+ "port": settings.REDIS_PORT,
19
+ "db": settings.REDIS_DB,
20
+ "decode_responses": True,
21
+ "max_connections": 5,
22
+ "socket_keepalive": True,
23
+ "socket_connect_timeout": 5,
24
+ "socket_timeout": 5,
25
+ "retry_on_timeout": False,
26
+ "health_check_interval": 30,
27
+ }
28
+ if settings.REDIS_PASSWORD:
29
+ pool_params["password"] = settings.REDIS_PASSWORD
30
+
31
+ pool = redis.ConnectionPool(**pool_params)
32
+ redis_client = redis.Redis(connection_pool=pool)
33
+ await redis_client.ping()
34
+ logger.info("Redis connected successfully", extra={"event": "redis_connected"})
35
+ except Exception as e:
36
+ logger.warning("Redis connection failed - continuing without cache", extra={"event": "redis_connect_failure", "error": str(e)})
37
+ redis_client = None
38
+
39
+
40
+ async def close_redis_connection():
41
+ global redis_client
42
+ if redis_client:
43
+ await redis_client.aclose()
44
+ redis_client = None
45
+ logger.info("Redis connection closed", extra={"event": "redis_disconnected"})
46
+
47
+
48
+ def get_redis() -> redis.Redis:
49
+ return redis_client
app/core/__init__.py ADDED
File without changes
app/core/config.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Configuration settings for Analytics Microservice.
3
+ """
4
+ import os
5
+ import logging
6
+ from typing import Optional, List
7
+ from pydantic import model_validator
8
+ from pydantic_settings import BaseSettings, SettingsConfigDict
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+
13
+ class Settings(BaseSettings):
14
+ # Application
15
+ APP_NAME: str = "Analytics Microservice"
16
+ APP_VERSION: str = "1.0.0"
17
+ DEBUG: bool = False
18
+
19
+ # MongoDB
20
+ MONGODB_URI: str = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
21
+ MONGODB_DB_NAME: str = os.getenv("MONGODB_DB_NAME", "cuatrolabs")
22
+
23
+ # PostgreSQL
24
+ POSTGRES_HOST: str = os.getenv("DB_HOST", "localhost")
25
+ POSTGRES_PORT: int = int(os.getenv("DB_PORT", "5432"))
26
+ POSTGRES_DB: str = os.getenv("DB_NAME", "cuatrolabs")
27
+ POSTGRES_USER: str = os.getenv("DB_USER", "postgres")
28
+ POSTGRES_PASSWORD: str = os.getenv("DB_PASSWORD", "")
29
+ POSTGRES_MIN_POOL_SIZE: int = int(os.getenv("POSTGRES_MIN_POOL_SIZE", "2"))
30
+ POSTGRES_MAX_POOL_SIZE: int = int(os.getenv("POSTGRES_MAX_POOL_SIZE", "10"))
31
+ POSTGRES_SSL_MODE: str = os.getenv("DB_SSLMODE", "disable")
32
+ POSTGRES_URI: Optional[str] = None
33
+
34
+ @model_validator(mode="after")
35
+ def assemble_db_connection(self) -> "Settings":
36
+ from urllib.parse import quote_plus
37
+ env_url = (os.getenv("DATABASE_URL") or os.getenv("DATABASE_URI") or "").strip()
38
+ if env_url:
39
+ self.POSTGRES_URI = env_url
40
+ return self
41
+ if all([self.POSTGRES_USER, self.POSTGRES_PASSWORD, self.POSTGRES_HOST, self.POSTGRES_DB]):
42
+ protocol = os.getenv("DB_PROTOCOL", "postgresql+asyncpg")
43
+ self.POSTGRES_URI = (
44
+ f"{protocol}://{self.POSTGRES_USER}:{quote_plus(self.POSTGRES_PASSWORD)}"
45
+ f"@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
46
+ )
47
+ return self
48
+
49
+ # Redis
50
+ REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost")
51
+ REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379"))
52
+ REDIS_PASSWORD: Optional[str] = os.getenv("REDIS_PASSWORD")
53
+ REDIS_DB: int = int(os.getenv("REDIS_DB", "0"))
54
+
55
+ # JWT
56
+ SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
57
+ ALGORITHM: str = os.getenv("ALGORITHM", "HS256")
58
+ TOKEN_EXPIRATION_HOURS: int = int(os.getenv("TOKEN_EXPIRATION_HOURS", "8"))
59
+
60
+ # Logging
61
+ LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
62
+ LOG_FORMAT: str = os.getenv("LOG_FORMAT", "json")
63
+
64
+ # CORS
65
+ CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:8000"]
66
+
67
+ model_config = SettingsConfigDict(env_file=".env", extra="ignore")
68
+
69
+
70
+ settings = Settings()
app/core/logging.py ADDED
@@ -0,0 +1,119 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Production-grade logging configuration for Analytics Microservice.
3
+ """
4
+ import json
5
+ import logging
6
+ import logging.handlers
7
+ import os
8
+ import sys
9
+ import traceback
10
+ from datetime import datetime, timezone
11
+ from pathlib import Path
12
+ from typing import Any, Dict, Optional
13
+
14
+
15
+ SERVICE_NAME = "analytics-ms"
16
+ LOG_DIR = Path(os.getenv("LOG_DIR", "logs"))
17
+ LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", 50 * 1024 * 1024))
18
+ LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", "10"))
19
+
20
+
21
+ class JSONFormatter(logging.Formatter):
22
+ """Emit log records as single-line JSON objects."""
23
+
24
+ RESERVED = frozenset({
25
+ "args", "created", "exc_info", "exc_text", "filename",
26
+ "funcName", "levelname", "levelno", "lineno", "message",
27
+ "module", "msecs", "msg", "name", "pathname", "process",
28
+ "processName", "relativeCreated", "stack_info", "thread", "threadName",
29
+ })
30
+
31
+ def format(self, record: logging.LogRecord) -> str:
32
+ payload: Dict[str, Any] = {
33
+ "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
34
+ "level": record.levelname,
35
+ "logger": record.name,
36
+ "message": record.getMessage(),
37
+ "service": SERVICE_NAME,
38
+ "pid": record.process,
39
+ }
40
+ if record.levelno >= logging.WARNING:
41
+ payload["caller"] = f"{record.pathname}:{record.lineno}"
42
+ for key, val in record.__dict__.items():
43
+ if key not in self.RESERVED and not key.startswith("_"):
44
+ payload[key] = val
45
+ if record.exc_info and record.exc_info[0] is not None:
46
+ exc_type, exc_value, exc_tb = record.exc_info
47
+ payload["exception"] = {
48
+ "type": exc_type.__name__,
49
+ "message": str(exc_value),
50
+ "stacktrace": traceback.format_exception(exc_type, exc_value, exc_tb),
51
+ }
52
+ try:
53
+ return json.dumps(payload, default=str)
54
+ except Exception:
55
+ payload["message"] = str(record.getMessage())
56
+ return json.dumps(payload, default=str)
57
+
58
+
59
+ class ConsoleFormatter(logging.Formatter):
60
+ GREY = "\x1b[38;5;240m"
61
+ CYAN = "\x1b[36m"
62
+ YELLOW = "\x1b[33m"
63
+ RED = "\x1b[31m"
64
+ BOLD_RED = "\x1b[1;31m"
65
+ RESET = "\x1b[0m"
66
+
67
+ LEVEL_COLORS = {
68
+ logging.DEBUG: "\x1b[38;5;240m",
69
+ logging.INFO: "\x1b[36m",
70
+ logging.WARNING: "\x1b[33m",
71
+ logging.ERROR: "\x1b[31m",
72
+ logging.CRITICAL: "\x1b[1;31m",
73
+ }
74
+
75
+ def format(self, record: logging.LogRecord) -> str:
76
+ color = self.LEVEL_COLORS.get(record.levelno, self.RESET)
77
+ ts = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%H:%M:%S")
78
+ msg = record.getMessage()
79
+ base = f"{self.GREY}{ts}{self.RESET} {color}{record.levelname:<8}{self.RESET} {record.name} - {msg}"
80
+ if record.exc_info:
81
+ base += "\n" + self.formatException(record.exc_info)
82
+ return base
83
+
84
+
85
+ def setup_logging(level: str = "INFO") -> None:
86
+ numeric_level = getattr(logging, level.upper(), logging.INFO)
87
+ root = logging.getLogger()
88
+ root.setLevel(numeric_level)
89
+ root.handlers.clear()
90
+
91
+ # Console handler
92
+ ch = logging.StreamHandler(sys.stdout)
93
+ ch.setLevel(numeric_level)
94
+ is_json = os.getenv("LOG_FORMAT", "json").lower() == "json"
95
+ ch.setFormatter(JSONFormatter() if is_json else ConsoleFormatter())
96
+ root.addHandler(ch)
97
+
98
+ # File handlers
99
+ try:
100
+ LOG_DIR.mkdir(parents=True, exist_ok=True)
101
+ fh = logging.handlers.RotatingFileHandler(
102
+ LOG_DIR / "app.log", maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT
103
+ )
104
+ fh.setLevel(numeric_level)
105
+ fh.setFormatter(JSONFormatter())
106
+ root.addHandler(fh)
107
+
108
+ eh = logging.handlers.RotatingFileHandler(
109
+ LOG_DIR / "app_errors.log", maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT
110
+ )
111
+ eh.setLevel(logging.ERROR)
112
+ eh.setFormatter(JSONFormatter())
113
+ root.addHandler(eh)
114
+ except Exception:
115
+ pass # Non-critical if file logging fails
116
+
117
+
118
+ def get_logger(name: str) -> logging.Logger:
119
+ return logging.getLogger(name)
app/dashboard/__init__.py ADDED
File without changes
app/dashboard/constants.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Constants for analytics dashboard module."""
2
+
3
+ ANALYTICS_METRICS_COLLECTION = "analytics_metrics"
4
+
5
+
6
+ class MetricPeriod:
7
+ HOURLY = "hourly"
8
+ DAILY = "daily"
9
+ WEEKLY = "weekly"
10
+ MONTHLY = "monthly"
11
+
12
+
13
+ class MetricType:
14
+ TOTAL_ORDERS = "total_orders"
15
+ TOTAL_REVENUE = "total_revenue"
16
+ TOTAL_USERS = "total_users"
17
+ TOTAL_EVENTS = "total_events"
18
+ CONVERSION_RATE = "conversion_rate"
19
+ AVG_ORDER_VALUE = "avg_order_value"
20
+ ACTIVE_SESSIONS = "active_sessions"
21
+ CUSTOM = "custom"
22
+
23
+ @classmethod
24
+ def values(cls):
25
+ return [v for k, v in vars(cls).items() if not k.startswith("_") and isinstance(v, str)]
app/dashboard/controllers/__init__.py ADDED
File without changes
app/dashboard/controllers/router.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analytics dashboard/metrics API router."""
2
+ from fastapi import APIRouter, Depends, status
3
+
4
+ from app.core.logging import get_logger
5
+ from app.dependencies.auth import get_current_user, TokenUser
6
+ from app.dashboard.schemas.schema import (
7
+ MetricUpsert, MetricResponse, MetricListRequest,
8
+ DashboardSummaryRequest, StatusResponse,
9
+ )
10
+ from app.dashboard.services.service import MetricService
11
+
12
+ logger = get_logger(__name__)
13
+
14
+ router = APIRouter(prefix="/metrics", tags=["dashboard", "metrics"])
15
+
16
+
17
+ @router.post("", response_model=StatusResponse, status_code=status.HTTP_201_CREATED, summary="Upsert a metric snapshot")
18
+ async def upsert_metric(
19
+ payload: MetricUpsert,
20
+ current_user: TokenUser = Depends(get_current_user),
21
+ ):
22
+ metric_id = await MetricService.upsert_metric(payload)
23
+ return StatusResponse(success=True, message="Metric upserted", metric_id=metric_id)
24
+
25
+
26
+ @router.post("/list", summary="List metrics with filters and optional projection")
27
+ async def list_metrics(
28
+ payload: MetricListRequest,
29
+ current_user: TokenUser = Depends(get_current_user),
30
+ ):
31
+ results = await MetricService.list_metrics(
32
+ filters=payload.filters,
33
+ skip=payload.skip,
34
+ limit=payload.limit,
35
+ projection_list=payload.projection_list,
36
+ )
37
+ total = await MetricService.count_metrics(payload.filters)
38
+ return {"success": True, "total": total, "skip": payload.skip, "limit": payload.limit, "data": results}
39
+
40
+
41
+ @router.post("/dashboard/summary", summary="Get aggregated dashboard summary for a merchant")
42
+ async def dashboard_summary(
43
+ payload: DashboardSummaryRequest,
44
+ current_user: TokenUser = Depends(get_current_user),
45
+ ):
46
+ summary = await MetricService.get_dashboard_summary(payload)
47
+ return {"success": True, **summary}
app/dashboard/models/__init__.py ADDED
File without changes
app/dashboard/models/model.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analytics metric snapshot model."""
2
+ from datetime import datetime
3
+ from typing import Optional, Dict, Any
4
+ from pydantic import BaseModel, Field
5
+
6
+
7
+ class MetricModel(BaseModel):
8
+ metric_id: str = Field(..., description="Unique metric identifier (UUID)")
9
+ metric_type: str = Field(..., description="Type of metric")
10
+ merchant_id: str = Field(..., description="Merchant this metric belongs to")
11
+ period: str = Field(..., description="Aggregation period: hourly | daily | weekly | monthly")
12
+ period_start: datetime = Field(..., description="Start of the aggregation period")
13
+ period_end: datetime = Field(..., description="End of the aggregation period")
14
+ value: float = Field(..., description="Metric value")
15
+ dimensions: Optional[Dict[str, Any]] = Field(None, description="Breakdown dimensions (source, category, etc.)")
16
+ metadata: Optional[Dict[str, Any]] = Field(None)
17
+ created_at: datetime = Field(default_factory=datetime.utcnow)
18
+ updated_at: Optional[datetime] = Field(None)
app/dashboard/schemas/__init__.py ADDED
File without changes
app/dashboard/schemas/schema.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pydantic schemas for analytics dashboard/metrics."""
2
+ from datetime import datetime
3
+ from typing import Optional, List, Dict, Any
4
+ from pydantic import BaseModel, Field
5
+
6
+
7
+ class MetricUpsert(BaseModel):
8
+ metric_type: str = Field(...)
9
+ merchant_id: str = Field(...)
10
+ period: str = Field(...)
11
+ period_start: datetime = Field(...)
12
+ period_end: datetime = Field(...)
13
+ value: float = Field(...)
14
+ dimensions: Optional[Dict[str, Any]] = Field(None)
15
+ metadata: Optional[Dict[str, Any]] = Field(None)
16
+
17
+
18
+ class MetricResponse(BaseModel):
19
+ metric_id: str
20
+ metric_type: str
21
+ merchant_id: str
22
+ period: str
23
+ period_start: datetime
24
+ period_end: datetime
25
+ value: float
26
+ dimensions: Optional[Dict[str, Any]] = None
27
+ created_at: datetime
28
+ updated_at: Optional[datetime] = None
29
+
30
+
31
+ class MetricFilters(BaseModel):
32
+ merchant_id: Optional[str] = None
33
+ metric_type: Optional[str] = None
34
+ period: Optional[str] = None
35
+ period_start_from: Optional[datetime] = None
36
+ period_start_to: Optional[datetime] = None
37
+
38
+
39
+ class MetricListRequest(BaseModel):
40
+ filters: Optional[MetricFilters] = Field(default_factory=MetricFilters)
41
+ skip: int = Field(0, ge=0)
42
+ limit: int = Field(100, ge=1, le=1000)
43
+ projection_list: Optional[List[str]] = Field(None, description="List of fields to include in response")
44
+
45
+
46
+ class DashboardSummaryRequest(BaseModel):
47
+ merchant_id: str = Field(...)
48
+ period: str = Field("daily", description="hourly | daily | weekly | monthly")
49
+ period_start: Optional[datetime] = Field(None)
50
+ period_end: Optional[datetime] = Field(None)
51
+ metric_types: Optional[List[str]] = Field(None, description="Filter to specific metric types")
52
+
53
+
54
+ class StatusResponse(BaseModel):
55
+ success: bool
56
+ message: str
57
+ metric_id: Optional[str] = None
app/dashboard/services/__init__.py ADDED
File without changes
app/dashboard/services/service.py ADDED
@@ -0,0 +1,121 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analytics dashboard/metrics service layer."""
2
+ import uuid
3
+ from datetime import datetime
4
+ from typing import Optional, List, Dict, Any
5
+
6
+ from fastapi import HTTPException, status
7
+ from motor.motor_asyncio import AsyncIOMotorDatabase
8
+
9
+ from app.core.logging import get_logger
10
+ from app.dashboard.constants import ANALYTICS_METRICS_COLLECTION
11
+ from app.dashboard.models.model import MetricModel
12
+ from app.dashboard.schemas.schema import MetricUpsert, MetricFilters, DashboardSummaryRequest
13
+ from app.nosql import get_database
14
+
15
+ logger = get_logger(__name__)
16
+
17
+
18
+ class MetricService:
19
+
20
+ @staticmethod
21
+ async def upsert_metric(payload: MetricUpsert) -> str:
22
+ db: AsyncIOMotorDatabase = get_database()
23
+ collection = db[ANALYTICS_METRICS_COLLECTION]
24
+
25
+ # Upsert by natural key: merchant + type + period + period_start
26
+ filter_key = {
27
+ "merchant_id": payload.merchant_id,
28
+ "metric_type": payload.metric_type,
29
+ "period": payload.period,
30
+ "period_start": payload.period_start,
31
+ }
32
+ existing = await collection.find_one(filter_key, {"_id": 0, "metric_id": 1})
33
+ metric_id = existing["metric_id"] if existing else str(uuid.uuid4())
34
+
35
+ doc = MetricModel(
36
+ metric_id=metric_id,
37
+ metric_type=payload.metric_type,
38
+ merchant_id=payload.merchant_id,
39
+ period=payload.period,
40
+ period_start=payload.period_start,
41
+ period_end=payload.period_end,
42
+ value=payload.value,
43
+ dimensions=payload.dimensions,
44
+ metadata=payload.metadata,
45
+ updated_at=datetime.utcnow(),
46
+ )
47
+ await collection.update_one(filter_key, {"$set": doc.model_dump()}, upsert=True)
48
+ logger.info("Metric upserted", extra={"event": "metric_upserted", "metric_id": metric_id})
49
+ return metric_id
50
+
51
+ @staticmethod
52
+ def _build_query(filters: MetricFilters) -> dict:
53
+ query: Dict[str, Any] = {}
54
+ if filters.merchant_id:
55
+ query["merchant_id"] = filters.merchant_id
56
+ if filters.metric_type:
57
+ query["metric_type"] = filters.metric_type
58
+ if filters.period:
59
+ query["period"] = filters.period
60
+ date_range: Dict[str, Any] = {}
61
+ if filters.period_start_from:
62
+ date_range["$gte"] = filters.period_start_from
63
+ if filters.period_start_to:
64
+ date_range["$lte"] = filters.period_start_to
65
+ if date_range:
66
+ query["period_start"] = date_range
67
+ return query
68
+
69
+ @staticmethod
70
+ async def list_metrics(
71
+ filters: MetricFilters,
72
+ skip: int = 0,
73
+ limit: int = 100,
74
+ projection_list: Optional[List[str]] = None,
75
+ ):
76
+ db: AsyncIOMotorDatabase = get_database()
77
+ collection = db[ANALYTICS_METRICS_COLLECTION]
78
+ query = MetricService._build_query(filters)
79
+
80
+ projection_dict = None
81
+ if projection_list:
82
+ projection_dict = {field: 1 for field in projection_list}
83
+ projection_dict["_id"] = 0
84
+
85
+ cursor = collection.find(query, projection_dict).sort("period_start", -1).skip(skip).limit(limit)
86
+ docs = await cursor.to_list(length=limit)
87
+ return docs if projection_list else [MetricModel(**d) for d in docs]
88
+
89
+ @staticmethod
90
+ async def count_metrics(filters: MetricFilters) -> int:
91
+ db: AsyncIOMotorDatabase = get_database()
92
+ query = MetricService._build_query(filters)
93
+ return await db[ANALYTICS_METRICS_COLLECTION].count_documents(query)
94
+
95
+ @staticmethod
96
+ async def get_dashboard_summary(req: DashboardSummaryRequest) -> Dict[str, Any]:
97
+ """Aggregate latest metric values per type for a merchant/period."""
98
+ db: AsyncIOMotorDatabase = get_database()
99
+ collection = db[ANALYTICS_METRICS_COLLECTION]
100
+
101
+ match: Dict[str, Any] = {"merchant_id": req.merchant_id, "period": req.period}
102
+ if req.metric_types:
103
+ match["metric_type"] = {"$in": req.metric_types}
104
+ if req.period_start:
105
+ match.setdefault("period_start", {})["$gte"] = req.period_start
106
+ if req.period_end:
107
+ match.setdefault("period_start", {})["$lte"] = req.period_end
108
+
109
+ pipeline = [
110
+ {"$match": match},
111
+ {"$sort": {"period_start": -1}},
112
+ {"$group": {
113
+ "_id": "$metric_type",
114
+ "latest_value": {"$first": "$value"},
115
+ "period_start": {"$first": "$period_start"},
116
+ "period_end": {"$first": "$period_end"},
117
+ }},
118
+ {"$project": {"_id": 0, "metric_type": "$_id", "latest_value": 1, "period_start": 1, "period_end": 1}},
119
+ ]
120
+ results = await collection.aggregate(pipeline).to_list(length=100)
121
+ return {"merchant_id": req.merchant_id, "period": req.period, "metrics": results}
app/dependencies/__init__.py ADDED
File without changes
app/dependencies/auth.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Authentication dependencies for Analytics Microservice.
3
+ Validates JWT tokens issued by the Auth microservice.
4
+ """
5
+ from typing import Optional
6
+ from fastapi import Depends, HTTPException, status
7
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
8
+ from jose import JWTError, jwt
9
+ from pydantic import BaseModel
10
+
11
+ from app.core.config import settings
12
+
13
+ security = HTTPBearer()
14
+
15
+
16
+ class TokenUser(BaseModel):
17
+ user_id: str
18
+ username: str
19
+ role_id: str
20
+ merchant_id: str
21
+ merchant_type: Optional[str] = None
22
+ metadata: Optional[dict] = None
23
+
24
+ def has_role(self, *roles: str) -> bool:
25
+ return self.role_id in roles
26
+
27
+ def is_admin(self) -> bool:
28
+ return "admin" in self.role_id.lower()
29
+
30
+ def is_super_admin(self) -> bool:
31
+ return "super_admin" in self.role_id.lower()
32
+
33
+
34
+ async def get_current_user(
35
+ credentials: HTTPAuthorizationCredentials = Depends(security),
36
+ ) -> TokenUser:
37
+ credentials_exception = HTTPException(
38
+ status_code=status.HTTP_401_UNAUTHORIZED,
39
+ detail="Could not validate credentials",
40
+ headers={"WWW-Authenticate": "Bearer"},
41
+ )
42
+ try:
43
+ payload = jwt.decode(
44
+ credentials.credentials,
45
+ settings.SECRET_KEY,
46
+ algorithms=[settings.ALGORITHM],
47
+ )
48
+ user_id: str = payload.get("user_id") or payload.get("sub")
49
+ if not user_id:
50
+ raise credentials_exception
51
+ return TokenUser(
52
+ user_id=user_id,
53
+ username=payload.get("username", ""),
54
+ role_id=payload.get("role_id", ""),
55
+ merchant_id=payload.get("merchant_id", ""),
56
+ merchant_type=payload.get("merchant_type"),
57
+ metadata=payload.get("metadata"),
58
+ )
59
+ except JWTError:
60
+ raise credentials_exception
app/dependencies/kpi_permissions.py ADDED
@@ -0,0 +1,136 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ KPI widget permission dependencies for Analytics Microservice.
3
+ Mirrors the SCM-ms pattern:
4
+ - merchant_id always from JWT (never from request body)
5
+ - require_dashboard_view: checks scm_access_roles.permissions.dashboard.view
6
+ - require_widget_access: checks scm_access_roles.widget_access[] per widget
7
+ """
8
+ from fastapi import Depends, HTTPException, status
9
+
10
+ from app.core.logging import get_logger
11
+ from app.dependencies.auth import get_current_user, TokenUser
12
+ from app.nosql import get_database
13
+
14
+ logger = get_logger(__name__)
15
+
16
+ SCM_ACCESS_ROLES_COLLECTION = "scm_access_roles"
17
+
18
+
19
+ async def _get_role_doc(role_id: str) -> dict | None:
20
+ db = get_database()
21
+ return await db[SCM_ACCESS_ROLES_COLLECTION].find_one(
22
+ {"role_id": role_id, "is_active": True}, {"_id": 0}
23
+ )
24
+
25
+
26
+ # ---------------------------------------------------------------------------
27
+ # Dependency: dashboard.view permission (bulk stats + rebuild + list)
28
+ # ---------------------------------------------------------------------------
29
+
30
+ async def require_dashboard_view(
31
+ current_user: TokenUser = Depends(get_current_user),
32
+ ) -> TokenUser:
33
+ """
34
+ Requires permissions.dashboard contains 'view'.
35
+ Mirrors SCM require_scm_permission('dashboard', 'view').
36
+ """
37
+ if not current_user.role_id:
38
+ raise HTTPException(
39
+ status_code=status.HTTP_403_FORBIDDEN,
40
+ detail="No role assigned to user",
41
+ )
42
+ if not current_user.merchant_id:
43
+ raise HTTPException(
44
+ status_code=status.HTTP_403_FORBIDDEN,
45
+ detail="merchant_id missing from token",
46
+ )
47
+
48
+ try:
49
+ role_doc = await _get_role_doc(current_user.role_id)
50
+ if not role_doc:
51
+ logger.warning(
52
+ "Access role not found",
53
+ extra={"event": "kpi_role_not_found", "role_id": current_user.role_id,
54
+ "user_id": current_user.user_id},
55
+ )
56
+ raise HTTPException(
57
+ status_code=status.HTTP_403_FORBIDDEN,
58
+ detail="Access role not found",
59
+ )
60
+
61
+ permissions = role_doc.get("permissions", {})
62
+ if "view" not in permissions.get("dashboard", []):
63
+ logger.warning(
64
+ "dashboard.view permission denied",
65
+ extra={"event": "kpi_permission_denied", "user_id": current_user.user_id,
66
+ "role_id": current_user.role_id},
67
+ )
68
+ raise HTTPException(
69
+ status_code=status.HTTP_403_FORBIDDEN,
70
+ detail="Access denied. Required permission: dashboard.view",
71
+ )
72
+
73
+ return current_user
74
+
75
+ except HTTPException:
76
+ raise
77
+ except Exception as exc:
78
+ logger.error(
79
+ "Permission check failed",
80
+ extra={"event": "kpi_permission_check_error", "error": str(exc)},
81
+ exc_info=True,
82
+ )
83
+ raise HTTPException(
84
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
85
+ detail="Permission check failed",
86
+ )
87
+
88
+
89
+ # ---------------------------------------------------------------------------
90
+ # Helper: widget-level access check (used inline in individual KPI endpoint)
91
+ # ---------------------------------------------------------------------------
92
+
93
+ async def check_widget_access(widget_id: str, current_user: TokenUser) -> None:
94
+ """
95
+ Verifies widget_id is listed in scm_access_roles.widget_access[].
96
+ Raises 403 if not. Mirrors SCM require_widget_access().
97
+ """
98
+ try:
99
+ role_doc = await _get_role_doc(current_user.role_id)
100
+ if not role_doc:
101
+ raise HTTPException(
102
+ status_code=status.HTTP_403_FORBIDDEN,
103
+ detail="Access role not found",
104
+ )
105
+
106
+ widget_access = role_doc.get("widget_access", [])
107
+ if widget_id not in widget_access:
108
+ logger.warning(
109
+ "Widget access denied",
110
+ extra={"event": "kpi_widget_access_denied", "widget_id": widget_id,
111
+ "user_id": current_user.user_id, "role_id": current_user.role_id},
112
+ )
113
+ raise HTTPException(
114
+ status_code=status.HTTP_403_FORBIDDEN,
115
+ detail=f"Access denied to widget: {widget_id}",
116
+ )
117
+
118
+ logger.debug(
119
+ "Widget access granted",
120
+ extra={"event": "kpi_widget_access_granted", "widget_id": widget_id,
121
+ "user_id": current_user.user_id},
122
+ )
123
+
124
+ except HTTPException:
125
+ raise
126
+ except Exception as exc:
127
+ logger.error(
128
+ "Widget access check error",
129
+ extra={"event": "kpi_widget_access_check_error", "widget_id": widget_id,
130
+ "error": str(exc)},
131
+ exc_info=True,
132
+ )
133
+ raise HTTPException(
134
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
135
+ detail="Error checking widget access",
136
+ )
app/events/__init__.py ADDED
File without changes
app/events/constants.py ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Constants for analytics events module."""
2
+
3
+ ANALYTICS_EVENTS_COLLECTION = "analytics_events"
4
+
5
+
6
+ class EventType:
7
+ PAGE_VIEW = "page_view"
8
+ CLICK = "click"
9
+ PURCHASE = "purchase"
10
+ CART_ADD = "cart_add"
11
+ CART_REMOVE = "cart_remove"
12
+ SEARCH = "search"
13
+ LOGIN = "login"
14
+ LOGOUT = "logout"
15
+ ORDER_PLACED = "order_placed"
16
+ ORDER_CANCELLED = "order_cancelled"
17
+ CUSTOM = "custom"
18
+
19
+ @classmethod
20
+ def values(cls):
21
+ return [v for k, v in vars(cls).items() if not k.startswith("_") and isinstance(v, str)]
app/events/controllers/__init__.py ADDED
File without changes
app/events/controllers/router.py ADDED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analytics events API router."""
2
+ from fastapi import APIRouter, Depends, status
3
+
4
+ from app.core.logging import get_logger
5
+ from app.dependencies.auth import get_current_user, TokenUser
6
+ from app.events.schemas.schema import EventCreate, EventResponse, EventListRequest, StatusResponse
7
+ from app.events.services.service import EventService
8
+
9
+ logger = get_logger(__name__)
10
+
11
+ router = APIRouter(prefix="/events", tags=["events"])
12
+
13
+
14
+ @router.post("", response_model=StatusResponse, status_code=status.HTTP_201_CREATED, summary="Ingest an analytics event")
15
+ async def create_event(
16
+ payload: EventCreate,
17
+ current_user: TokenUser = Depends(get_current_user),
18
+ ):
19
+ event_id = await EventService.create_event(payload)
20
+ return StatusResponse(success=True, message="Event recorded", event_id=event_id)
21
+
22
+
23
+ @router.get("/{event_id}", response_model=EventResponse, summary="Get event by ID")
24
+ async def get_event(
25
+ event_id: str,
26
+ current_user: TokenUser = Depends(get_current_user),
27
+ ):
28
+ return await EventService.get_event(event_id)
29
+
30
+
31
+ @router.post("/list", summary="List events with filters and optional projection")
32
+ async def list_events(
33
+ payload: EventListRequest,
34
+ current_user: TokenUser = Depends(get_current_user),
35
+ ):
36
+ results = await EventService.list_events(
37
+ filters=payload.filters,
38
+ skip=payload.skip,
39
+ limit=payload.limit,
40
+ projection_list=payload.projection_list,
41
+ )
42
+ total = await EventService.count_events(payload.filters)
43
+ return {"success": True, "total": total, "skip": payload.skip, "limit": payload.limit, "data": results}
app/events/models/__init__.py ADDED
File without changes
app/events/models/model.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analytics event document model."""
2
+ from datetime import datetime
3
+ from typing import Optional, Dict, Any
4
+ from pydantic import BaseModel, Field
5
+
6
+
7
+ class AnalyticsEventModel(BaseModel):
8
+ event_id: str = Field(..., description="Unique event identifier (UUID)")
9
+ event_type: str = Field(..., description="Type of event")
10
+ merchant_id: str = Field(..., description="Merchant this event belongs to")
11
+ user_id: Optional[str] = Field(None, description="User who triggered the event")
12
+ session_id: Optional[str] = Field(None, description="Session identifier")
13
+ source: Optional[str] = Field(None, description="Source system (pos, ecomm, scm, spa)")
14
+ entity_type: Optional[str] = Field(None, description="Entity type (product, order, etc.)")
15
+ entity_id: Optional[str] = Field(None, description="Entity identifier")
16
+ properties: Optional[Dict[str, Any]] = Field(None, description="Event-specific properties")
17
+ metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
18
+ occurred_at: datetime = Field(default_factory=datetime.utcnow, description="When the event occurred")
19
+ created_at: datetime = Field(default_factory=datetime.utcnow, description="When the record was created")
app/events/schemas/__init__.py ADDED
File without changes
app/events/schemas/schema.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pydantic schemas for analytics events."""
2
+ from datetime import datetime
3
+ from typing import Optional, List, Dict, Any
4
+ from pydantic import BaseModel, Field
5
+
6
+ from app.events.constants import EventType
7
+
8
+
9
+ class EventCreate(BaseModel):
10
+ event_type: str = Field(..., description="Type of event")
11
+ merchant_id: str = Field(..., description="Merchant this event belongs to")
12
+ user_id: Optional[str] = Field(None)
13
+ session_id: Optional[str] = Field(None)
14
+ source: Optional[str] = Field(None, description="pos | ecomm | scm | spa")
15
+ entity_type: Optional[str] = Field(None)
16
+ entity_id: Optional[str] = Field(None)
17
+ properties: Optional[Dict[str, Any]] = Field(None)
18
+ metadata: Optional[Dict[str, Any]] = Field(None)
19
+ occurred_at: Optional[datetime] = Field(None)
20
+
21
+
22
+ class EventResponse(BaseModel):
23
+ event_id: str
24
+ event_type: str
25
+ merchant_id: str
26
+ user_id: Optional[str] = None
27
+ session_id: Optional[str] = None
28
+ source: Optional[str] = None
29
+ entity_type: Optional[str] = None
30
+ entity_id: Optional[str] = None
31
+ properties: Optional[Dict[str, Any]] = None
32
+ occurred_at: datetime
33
+ created_at: datetime
34
+
35
+
36
+ class EventFilters(BaseModel):
37
+ merchant_id: Optional[str] = None
38
+ event_type: Optional[str] = None
39
+ source: Optional[str] = None
40
+ entity_type: Optional[str] = None
41
+ entity_id: Optional[str] = None
42
+ user_id: Optional[str] = None
43
+ date_from: Optional[datetime] = None
44
+ date_to: Optional[datetime] = None
45
+
46
+
47
+ class EventListRequest(BaseModel):
48
+ filters: Optional[EventFilters] = Field(default_factory=EventFilters)
49
+ skip: int = Field(0, ge=0)
50
+ limit: int = Field(100, ge=1, le=1000)
51
+ projection_list: Optional[List[str]] = Field(None, description="List of fields to include in response")
52
+
53
+
54
+ class StatusResponse(BaseModel):
55
+ success: bool
56
+ message: str
57
+ event_id: Optional[str] = None
app/events/services/__init__.py ADDED
File without changes
app/events/services/service.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Analytics events service layer."""
2
+ import uuid
3
+ from datetime import datetime
4
+ from typing import Optional, List, Dict, Any
5
+
6
+ from fastapi import HTTPException, status
7
+ from motor.motor_asyncio import AsyncIOMotorDatabase
8
+
9
+ from app.core.logging import get_logger
10
+ from app.events.constants import ANALYTICS_EVENTS_COLLECTION
11
+ from app.events.models.model import AnalyticsEventModel
12
+ from app.events.schemas.schema import EventCreate, EventFilters
13
+ from app.nosql import get_database
14
+
15
+ logger = get_logger(__name__)
16
+
17
+
18
+ class EventService:
19
+
20
+ @staticmethod
21
+ async def create_event(payload: EventCreate) -> str:
22
+ db: AsyncIOMotorDatabase = get_database()
23
+ collection = db[ANALYTICS_EVENTS_COLLECTION]
24
+ event_id = str(uuid.uuid4())
25
+ doc = AnalyticsEventModel(
26
+ event_id=event_id,
27
+ event_type=payload.event_type,
28
+ merchant_id=payload.merchant_id,
29
+ user_id=payload.user_id,
30
+ session_id=payload.session_id,
31
+ source=payload.source,
32
+ entity_type=payload.entity_type,
33
+ entity_id=payload.entity_id,
34
+ properties=payload.properties,
35
+ metadata=payload.metadata,
36
+ occurred_at=payload.occurred_at or datetime.utcnow(),
37
+ )
38
+ await collection.insert_one(doc.model_dump())
39
+ logger.info("Event created", extra={"event": "event_created", "event_id": event_id, "event_type": payload.event_type})
40
+ return event_id
41
+
42
+ @staticmethod
43
+ async def get_event(event_id: str) -> AnalyticsEventModel:
44
+ db: AsyncIOMotorDatabase = get_database()
45
+ doc = await db[ANALYTICS_EVENTS_COLLECTION].find_one({"event_id": event_id}, {"_id": 0})
46
+ if not doc:
47
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
48
+ return AnalyticsEventModel(**doc)
49
+
50
+ @staticmethod
51
+ def _build_query(filters: EventFilters) -> dict:
52
+ query: Dict[str, Any] = {}
53
+ if filters.merchant_id:
54
+ query["merchant_id"] = filters.merchant_id
55
+ if filters.event_type:
56
+ query["event_type"] = filters.event_type
57
+ if filters.source:
58
+ query["source"] = filters.source
59
+ if filters.entity_type:
60
+ query["entity_type"] = filters.entity_type
61
+ if filters.entity_id:
62
+ query["entity_id"] = filters.entity_id
63
+ if filters.user_id:
64
+ query["user_id"] = filters.user_id
65
+ date_range: Dict[str, Any] = {}
66
+ if filters.date_from:
67
+ date_range["$gte"] = filters.date_from
68
+ if filters.date_to:
69
+ date_range["$lte"] = filters.date_to
70
+ if date_range:
71
+ query["occurred_at"] = date_range
72
+ return query
73
+
74
+ @staticmethod
75
+ async def list_events(
76
+ filters: EventFilters,
77
+ skip: int = 0,
78
+ limit: int = 100,
79
+ projection_list: Optional[List[str]] = None,
80
+ ):
81
+ db: AsyncIOMotorDatabase = get_database()
82
+ collection = db[ANALYTICS_EVENTS_COLLECTION]
83
+ query = EventService._build_query(filters)
84
+
85
+ projection_dict = None
86
+ if projection_list:
87
+ projection_dict = {field: 1 for field in projection_list}
88
+ projection_dict["_id"] = 0
89
+
90
+ cursor = collection.find(query, projection_dict).sort("occurred_at", -1).skip(skip).limit(limit)
91
+ docs = await cursor.to_list(length=limit)
92
+ return docs if projection_list else [AnalyticsEventModel(**d) for d in docs]
93
+
94
+ @staticmethod
95
+ async def count_events(filters: EventFilters) -> int:
96
+ db: AsyncIOMotorDatabase = get_database()
97
+ query = EventService._build_query(filters)
98
+ return await db[ANALYTICS_EVENTS_COLLECTION].count_documents(query)
app/insightfy_utils-0.1.0-py3-none-any.whl ADDED
Binary file (32.2 kB). View file
 
app/kpi_cache/__init__.py ADDED
File without changes
app/kpi_cache/constants.py ADDED
@@ -0,0 +1,113 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Constants for KPI cache module."""
2
+
3
+ # MongoDB collection name for durable KPI snapshots
4
+ KPI_CACHE_COLLECTION = "kpi_cache"
5
+
6
+ # Redis key prefix for hot L1 cache
7
+ REDIS_KPI_PREFIX = "kpi"
8
+
9
+ # Redis TTLs (seconds) — L1 hot cache, shorter than MongoDB snapshots
10
+ REDIS_TTL_FINANCIAL = 1800 # 30 min
11
+ REDIS_TTL_INVENTORY = 600 # 10 min
12
+ REDIS_TTL_OPERATIONS = 600 # 10 min
13
+ REDIS_TTL_SALES = 900 # 15 min
14
+
15
+ # Widget category → Redis TTL mapping
16
+ WIDGET_REDIS_TTL: dict = {
17
+ "financial": REDIS_TTL_FINANCIAL,
18
+ "inventory": REDIS_TTL_INVENTORY,
19
+ "operations": REDIS_TTL_OPERATIONS,
20
+ "sales": REDIS_TTL_SALES,
21
+ }
22
+
23
+ # All 12 KPI widget definitions
24
+ KPI_WIDGET_REGISTRY: dict = {
25
+ # --- OPERATIONS ---
26
+ "wid_open_po_count_001": {
27
+ "title": "Open Purchase Orders",
28
+ "category": "operations",
29
+ "unit": "count",
30
+ "description": "Count of POs in submitted/approved/dispatched/partially_received status",
31
+ "drill_down_url": "/purchases/orders",
32
+ },
33
+ "wid_po_aging_001": {
34
+ "title": "PO Aging",
35
+ "category": "operations",
36
+ "unit": "count",
37
+ "description": "Open POs grouped by age: 0-7, 8-14, 15-30, 30+ days",
38
+ "drill_down_url": "/purchases/orders",
39
+ },
40
+ "wid_receipts_this_week_001": {
41
+ "title": "Receipts This Week",
42
+ "category": "operations",
43
+ "unit": "count",
44
+ "description": "GRNs received in the last 7 days",
45
+ "drill_down_url": "/purchases/receipts",
46
+ },
47
+ "wid_stock_ins_today_001": {
48
+ "title": "Stock-Ins Today",
49
+ "category": "operations",
50
+ "unit": "count",
51
+ "description": "Direct stock-in (Self-GRN) transactions created today",
52
+ "drill_down_url": "/self-grn",
53
+ },
54
+ "wid_stock_take_pending_001": {
55
+ "title": "Pending Stock Takes",
56
+ "category": "operations",
57
+ "unit": "count",
58
+ "description": "Stock takes in draft or in_progress status",
59
+ "drill_down_url": "/inventory/stock-take",
60
+ },
61
+ "wid_shipments_transit_001": {
62
+ "title": "Shipments In Transit",
63
+ "category": "operations",
64
+ "unit": "count",
65
+ "description": "Trade shipments currently in transit",
66
+ "drill_down_url": "/trade-sales/client-orders",
67
+ },
68
+ # --- INVENTORY ---
69
+ "wid_low_stock_skus_001": {
70
+ "title": "Low Stock SKUs",
71
+ "category": "inventory",
72
+ "unit": "count",
73
+ "description": "SKUs at or below reorder point; includes stockout count",
74
+ "drill_down_url": "/inventory/stock-overview",
75
+ },
76
+ "wid_net_stock_value_001": {
77
+ "title": "Net Stock Value",
78
+ "category": "inventory",
79
+ "unit": "INR",
80
+ "description": "Total value of current inventory (qty_on_hand * cost_price)",
81
+ "drill_down_url": "/inventory/stock-overview",
82
+ },
83
+ "wid_adjustments_mtd_001": {
84
+ "title": "Adjustments This Month",
85
+ "category": "inventory",
86
+ "unit": "count",
87
+ "description": "Approved/applied stock adjustments month-to-date",
88
+ "drill_down_url": "/inventory/adjustments",
89
+ },
90
+ # --- FINANCIAL ---
91
+ "wid_invoices_mtd_001": {
92
+ "title": "Invoices MTD",
93
+ "category": "financial",
94
+ "unit": "count",
95
+ "description": "Invoices issued month-to-date (non-cancelled, non-draft)",
96
+ "drill_down_url": "/trade-sales/invoices",
97
+ },
98
+ "wid_credit_debit_notes_mtd_001": {
99
+ "title": "Credit / Debit Notes MTD",
100
+ "category": "financial",
101
+ "unit": "INR",
102
+ "description": "Net value of credit and debit notes issued this month",
103
+ "drill_down_url": "/trade-sales/credit-debit-notes",
104
+ },
105
+ # --- SALES ---
106
+ "wid_pos_sales_today_001": {
107
+ "title": "POS Sales Today",
108
+ "category": "sales",
109
+ "unit": "INR",
110
+ "description": "Today's POS sales total and transaction count (Phase 2 stub)",
111
+ "drill_down_url": "/retail/pos",
112
+ },
113
+ }
app/kpi_cache/controllers/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # kpi_cache controllers package
app/kpi_cache/controllers/router.py ADDED
@@ -0,0 +1,182 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ KPI Cache router.
3
+ POST /kpi-cache/stats — bulk KPI fetch
4
+ POST /kpi-cache/stats/individual/{kpi_id} — single KPI (404 on unknown, 403 on no widget access)
5
+ POST /kpi-cache/rebuild — force recompute
6
+ POST /kpi-cache/list — list cache snapshots (projection supported)
7
+
8
+ Auth rules (mirrors SCM-ms widget_router.py):
9
+ - merchant_id is ALWAYS sourced from JWT — never from request body
10
+ - Bulk stats / rebuild / list: require permissions.dashboard.view
11
+ - Individual KPI: additionally checks scm_access_roles.widget_access[] for that widget_id
12
+ """
13
+ from datetime import datetime, timezone
14
+ from fastapi import APIRouter, Depends, HTTPException, status
15
+
16
+ from app.core.logging import get_logger
17
+ from app.dependencies.auth import get_current_user, TokenUser
18
+ from app.dependencies.kpi_permissions import require_dashboard_view, check_widget_access
19
+ from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY
20
+ from app.kpi_cache.schemas.schema import (
21
+ KPIStatsRequest,
22
+ KPIStatsResponse,
23
+ KPIIndividualRequest,
24
+ KPIRebuildRequest,
25
+ RebuildResponse,
26
+ KPICacheListRequest,
27
+ )
28
+ from app.kpi_cache.services.service import KPICacheService
29
+
30
+ logger = get_logger(__name__)
31
+
32
+ router = APIRouter(prefix="/kpi-cache", tags=["KPI Cache"])
33
+
34
+
35
+ @router.post("/stats", response_model=KPIStatsResponse)
36
+ async def get_kpi_stats(
37
+ payload: KPIStatsRequest,
38
+ current_user: TokenUser = Depends(require_dashboard_view),
39
+ ):
40
+ """
41
+ Return KPI widget values for the authenticated merchant.
42
+ merchant_id is taken from JWT — not from the request body.
43
+ Requires: permissions.dashboard.view
44
+ Partial failures return {"error": ..., "value": null} per KPI — never HTTP 500.
45
+ """
46
+ branch_id = payload.branch_id or "all"
47
+ try:
48
+ kpis = await KPICacheService.get_kpi_stats(
49
+ merchant_id=current_user.merchant_id,
50
+ period_window=payload.period_window,
51
+ branch_id=branch_id,
52
+ kpi_ids=payload.kpi_ids,
53
+ use_cache=payload.use_cache,
54
+ )
55
+ logger.info(
56
+ "KPI stats retrieved",
57
+ extra={
58
+ "event": "kpi_stats_retrieved",
59
+ "user_id": current_user.user_id,
60
+ "merchant_id": current_user.merchant_id,
61
+ "period_window": payload.period_window,
62
+ "kpi_count": len(kpis),
63
+ },
64
+ )
65
+ return KPIStatsResponse(
66
+ success=True,
67
+ merchant_id=current_user.merchant_id,
68
+ period_window=payload.period_window,
69
+ branch_id=branch_id,
70
+ kpis=kpis,
71
+ generated_at=datetime.now(timezone.utc).isoformat(),
72
+ )
73
+ except Exception as exc:
74
+ logger.error("get_kpi_stats failed",
75
+ extra={"event": "kpi_stats_error", "error": str(exc)}, exc_info=True)
76
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
77
+ detail="Failed to fetch KPI stats")
78
+
79
+
80
+ @router.post("/stats/individual/{kpi_id}")
81
+ async def get_individual_kpi(
82
+ kpi_id: str,
83
+ payload: KPIIndividualRequest,
84
+ current_user: TokenUser = Depends(require_dashboard_view),
85
+ ):
86
+ """
87
+ Refresh a single KPI widget.
88
+ Requires: permissions.dashboard.view + widget_id in scm_access_roles.widget_access[]
89
+ Returns HTTP 404 with available_kpis list when kpi_id is unknown.
90
+ use_cache=false bypasses cache read but writes fresh result back to Redis.
91
+ """
92
+ # 404 on unknown widget_id (exact match, not fuzzy)
93
+ if kpi_id not in KPI_HANDLER_REGISTRY:
94
+ raise HTTPException(
95
+ status_code=status.HTTP_404_NOT_FOUND,
96
+ detail={
97
+ "error": "KPI not found",
98
+ "available_kpis": list(KPI_HANDLER_REGISTRY.keys()),
99
+ },
100
+ )
101
+
102
+ # Widget-level access control
103
+ await check_widget_access(kpi_id, current_user)
104
+
105
+ branch_id = payload.branch_id or "all"
106
+ result = await KPICacheService.get_individual_kpi(
107
+ widget_id=kpi_id,
108
+ merchant_id=current_user.merchant_id,
109
+ period_window=payload.period_window,
110
+ branch_id=branch_id,
111
+ use_cache=payload.use_cache,
112
+ )
113
+ logger.info(
114
+ "Individual KPI retrieved",
115
+ extra={
116
+ "event": "kpi_individual_retrieved",
117
+ "kpi_id": kpi_id,
118
+ "user_id": current_user.user_id,
119
+ "merchant_id": current_user.merchant_id,
120
+ },
121
+ )
122
+ return {"success": True, "merchant_id": current_user.merchant_id, "kpi": result}
123
+
124
+
125
+ @router.post("/rebuild", response_model=RebuildResponse)
126
+ async def rebuild_kpi_cache(
127
+ payload: KPIRebuildRequest,
128
+ current_user: TokenUser = Depends(require_dashboard_view),
129
+ ):
130
+ """
131
+ Force recompute KPI cache for the authenticated merchant.
132
+ Requires: permissions.dashboard.view
133
+ """
134
+ try:
135
+ result = await KPICacheService.rebuild(
136
+ merchant_id=current_user.merchant_id,
137
+ period_window=payload.period_window,
138
+ branch_id=payload.branch_id,
139
+ kpi_ids=payload.kpi_ids,
140
+ )
141
+ return RebuildResponse(
142
+ success=True,
143
+ message="KPI cache rebuilt successfully",
144
+ merchant_id=current_user.merchant_id,
145
+ rebuilt_count=result["rebuilt"],
146
+ failed_count=result["failed"],
147
+ details=result.get("errors") or None,
148
+ )
149
+ except Exception as exc:
150
+ logger.error("rebuild_kpi_cache failed",
151
+ extra={"event": "kpi_rebuild_error", "error": str(exc)}, exc_info=True)
152
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
153
+ detail="Failed to rebuild KPI cache")
154
+
155
+
156
+ @router.post("/list")
157
+ async def list_kpi_cache(
158
+ payload: KPICacheListRequest,
159
+ current_user: TokenUser = Depends(require_dashboard_view),
160
+ ):
161
+ """
162
+ List KPI cache snapshots with optional projection_list.
163
+ merchant_id filter is always enforced from JWT — cannot be overridden.
164
+ Requires: permissions.dashboard.view
165
+ """
166
+ filters = payload.filters.model_dump() if payload.filters else {}
167
+ # Always scope to the authenticated merchant — ignore any merchant_id in filters
168
+ filters["merchant_id"] = current_user.merchant_id
169
+
170
+ try:
171
+ docs = await KPICacheService.list_cache(
172
+ filters=filters,
173
+ skip=payload.skip,
174
+ limit=payload.limit,
175
+ projection_list=payload.projection_list,
176
+ )
177
+ return {"success": True, "data": docs, "count": len(docs)}
178
+ except Exception as exc:
179
+ logger.error("list_kpi_cache failed",
180
+ extra={"event": "kpi_list_error", "error": str(exc)}, exc_info=True)
181
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
182
+ detail="Failed to list KPI cache")
app/kpi_cache/models/__init__.py ADDED
File without changes
app/kpi_cache/models/model.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """MongoDB document model for KPI cache snapshots."""
2
+ from datetime import datetime
3
+ from typing import Optional, Dict, Any
4
+ from pydantic import BaseModel, Field
5
+
6
+
7
+ class KPICacheDocument(BaseModel):
8
+ """
9
+ Durable KPI snapshot stored in MongoDB.
10
+ One document per (merchant_id, widget_id, period_window, branch_id).
11
+ """
12
+ cache_id: str = Field(..., description="UUID for this cache document")
13
+ merchant_id: str = Field(..., description="Tenant identifier")
14
+ widget_id: str = Field(..., description="KPI widget identifier")
15
+ period_window: str = Field(..., description="today | last_7_days | mtd | ytd | last_12_months")
16
+ branch_id: str = Field("all", description="Branch/warehouse filter; 'all' means no filter")
17
+
18
+ # KPI payload
19
+ value: float = Field(..., description="Primary KPI value")
20
+ unit: str = Field(..., description="count | INR | % | ratio")
21
+ delta: Optional[float] = Field(None)
22
+ delta_percentage: Optional[float] = Field(None)
23
+ trend: str = Field("neutral", description="up | down | neutral")
24
+ secondary_values: Optional[Dict[str, Any]] = Field(None)
25
+ drill_down_url: Optional[str] = Field(None)
26
+
27
+ # Metadata
28
+ computed_at: datetime = Field(default_factory=datetime.utcnow)
29
+ expires_at: Optional[datetime] = Field(None, description="When this snapshot should be rebuilt")
30
+ cached: bool = Field(False, description="Always False for freshly written docs")
31
+ error: Optional[str] = Field(None, description="Set when computation failed")
app/kpi_cache/schemas/__init__.py ADDED
File without changes
app/kpi_cache/schemas/schema.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pydantic schemas for KPI cache API."""
2
+ from datetime import datetime
3
+ from typing import Optional, List, Dict, Any
4
+ from pydantic import BaseModel, Field
5
+
6
+
7
+ class KPIStatsRequest(BaseModel):
8
+ """Request body for POST /kpi-cache/stats"""
9
+ period_window: str = Field("mtd", description="today | last_7_days | mtd | ytd | last_12_months")
10
+ branch_id: Optional[str] = Field(None, description="Filter by branch/warehouse")
11
+ kpi_ids: Optional[List[str]] = Field(None, description="Subset of widget IDs; omit for all 12")
12
+ use_cache: bool = Field(True, description="False forces a live rebuild for this request")
13
+
14
+
15
+ class KPIIndividualRequest(BaseModel):
16
+ """Request body for POST /kpi-cache/stats/individual/{kpi_id}"""
17
+ period_window: str = Field("mtd", description="today | last_7_days | mtd | ytd | last_12_months")
18
+ branch_id: Optional[str] = Field(None, description="Filter by branch/warehouse")
19
+ use_cache: bool = Field(True, description="False forces a live recompute and cache write-back")
20
+
21
+
22
+ class KPIRebuildRequest(BaseModel):
23
+ """Request body for POST /kpi-cache/rebuild"""
24
+ period_window: Optional[str] = Field(None, description="Rebuild only this period; omit for all")
25
+ branch_id: Optional[str] = Field(None, description="Rebuild only this branch; omit for all")
26
+ kpi_ids: Optional[List[str]] = Field(None, description="Subset of widget IDs; omit for all 12")
27
+
28
+
29
+ class KPICacheFilters(BaseModel):
30
+ merchant_id: Optional[str] = Field(None)
31
+ widget_id: Optional[str] = Field(None)
32
+ period_window: Optional[str] = Field(None)
33
+ branch_id: Optional[str] = Field(None)
34
+ computed_at_from: Optional[datetime] = Field(None)
35
+ computed_at_to: Optional[datetime] = Field(None)
36
+
37
+
38
+ class KPICacheListRequest(BaseModel):
39
+ """Request body for POST /kpi-cache/list"""
40
+ filters: Optional[KPICacheFilters] = Field(default_factory=KPICacheFilters)
41
+ skip: int = Field(0, ge=0)
42
+ limit: int = Field(100, ge=1, le=1000)
43
+ projection_list: Optional[List[str]] = Field(
44
+ None, description="List of fields to include in response"
45
+ )
46
+
47
+
48
+ class KPIResultSchema(BaseModel):
49
+ widget_id: str
50
+ title: str
51
+ category: str
52
+ value: float
53
+ unit: str
54
+ delta: Optional[float] = None
55
+ delta_percentage: Optional[float] = None
56
+ trend: str = "neutral"
57
+ secondary_values: Optional[Dict[str, Any]] = None
58
+ drill_down_url: Optional[str] = None
59
+ cached: bool = False
60
+ computed_at: Optional[datetime] = None
61
+ error: Optional[str] = None
62
+
63
+
64
+ class KPIStatsResponse(BaseModel):
65
+ success: bool
66
+ merchant_id: str
67
+ period_window: str
68
+ branch_id: str = "all"
69
+ kpis: Dict[str, Any]
70
+ generated_at: str
71
+
72
+
73
+ class RebuildResponse(BaseModel):
74
+ success: bool
75
+ message: str
76
+ merchant_id: str
77
+ rebuilt_count: int
78
+ failed_count: int
79
+ details: Optional[Dict[str, Any]] = None
app/kpi_cache/services/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # kpi_cache services package
app/kpi_cache/services/service.py ADDED
@@ -0,0 +1,232 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ KPI Cache Service — registry-based asyncio.gather() dispatch.
3
+ Reads from Redis L1 (via KPIHandler.compute) -> MongoDB L2 write-back.
4
+ """
5
+ import asyncio
6
+ import uuid
7
+ from datetime import datetime, timezone, timedelta
8
+ from typing import Optional, List, Dict, Any
9
+
10
+ from app.core.logging import get_logger
11
+ from app.nosql import get_database
12
+ from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY
13
+ from app.kpi_handlers.base_handler import KPIResult
14
+ from app.kpi_cache.constants import (
15
+ KPI_CACHE_COLLECTION,
16
+ KPI_WIDGET_REGISTRY,
17
+ WIDGET_REDIS_TTL,
18
+ )
19
+ from app.kpi_cache.schemas.schema import KPIResultSchema
20
+
21
+ logger = get_logger(__name__)
22
+
23
+ _DEFAULT_TTL = 600
24
+
25
+
26
+ def _ttl_for(widget_id: str) -> int:
27
+ meta = KPI_WIDGET_REGISTRY.get(widget_id, {})
28
+ return WIDGET_REDIS_TTL.get(meta.get("category", ""), _DEFAULT_TTL)
29
+
30
+
31
+ class KPICacheService:
32
+
33
+ @staticmethod
34
+ async def get_kpi_stats(
35
+ merchant_id: str,
36
+ period_window: str,
37
+ branch_id: str,
38
+ kpi_ids: Optional[List[str]],
39
+ use_cache: bool,
40
+ ) -> Dict[str, Any]:
41
+ """
42
+ Concurrently compute all requested KPIs via asyncio.gather().
43
+ Failing KPIs return {"error": "...", "value": null} — never HTTP 500.
44
+ """
45
+ target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys())
46
+
47
+ async def compute_one(wid: str):
48
+ handler = KPI_HANDLER_REGISTRY.get(wid)
49
+ if not handler:
50
+ return wid, {"error": f"KPI not found: {wid}", "value": None}
51
+ try:
52
+ br = branch_id if branch_id != "all" else None
53
+ result: KPIResult = await handler.compute(
54
+ merchant_id=merchant_id,
55
+ period_window=period_window,
56
+ branch_id=br,
57
+ use_cache=use_cache,
58
+ )
59
+ await KPICacheService._write_to_mongo(
60
+ merchant_id, wid, period_window, branch_id, result
61
+ )
62
+ return wid, KPICacheService._to_schema(wid, result)
63
+ except Exception as exc:
64
+ logger.error(
65
+ "KPI computation failed",
66
+ extra={"event": "kpi_compute_error", "widget_id": wid,
67
+ "merchant_id": merchant_id, "error": str(exc)},
68
+ exc_info=True,
69
+ )
70
+ return wid, {"error": str(exc), "value": None}
71
+
72
+ pairs = await asyncio.gather(*[compute_one(wid) for wid in target_ids])
73
+ return dict(pairs)
74
+
75
+ @staticmethod
76
+ async def get_individual_kpi(
77
+ widget_id: str,
78
+ merchant_id: str,
79
+ period_window: str,
80
+ branch_id: str,
81
+ use_cache: bool,
82
+ ) -> Optional[KPIResultSchema]:
83
+ """
84
+ Compute a single KPI. Returns None when widget_id is unknown (caller raises 404).
85
+ use_cache=False bypasses cache read but writes fresh result back to Redis.
86
+ """
87
+ handler = KPI_HANDLER_REGISTRY.get(widget_id)
88
+ if not handler:
89
+ return None
90
+
91
+ br = branch_id if branch_id != "all" else None
92
+ result: KPIResult = await handler.compute(
93
+ merchant_id=merchant_id,
94
+ period_window=period_window,
95
+ branch_id=br,
96
+ use_cache=use_cache,
97
+ )
98
+ await KPICacheService._write_to_mongo(
99
+ merchant_id, widget_id, period_window, branch_id, result
100
+ )
101
+ return KPICacheService._to_schema(widget_id, result)
102
+
103
+ @staticmethod
104
+ async def rebuild(
105
+ merchant_id: str,
106
+ period_window: Optional[str],
107
+ branch_id: Optional[str],
108
+ kpi_ids: Optional[List[str]],
109
+ ) -> Dict[str, Any]:
110
+ """Force recompute for all/subset of KPIs and update both caches."""
111
+ target_ids = kpi_ids or list(KPI_HANDLER_REGISTRY.keys())
112
+ periods = [period_window] if period_window else ["today", "last_7_days", "mtd", "ytd"]
113
+ branches = [branch_id] if branch_id else ["all"]
114
+
115
+ rebuilt, failed = 0, 0
116
+ errors: Dict[str, str] = {}
117
+
118
+ for pw in periods:
119
+ for br in branches:
120
+ results = await KPICacheService.get_kpi_stats(
121
+ merchant_id=merchant_id,
122
+ period_window=pw,
123
+ branch_id=br,
124
+ kpi_ids=target_ids,
125
+ use_cache=False,
126
+ )
127
+ for wid, payload in results.items():
128
+ if isinstance(payload, dict) and payload.get("error"):
129
+ failed += 1
130
+ errors[f"{wid}:{pw}:{br}"] = payload["error"]
131
+ else:
132
+ rebuilt += 1
133
+
134
+ return {"rebuilt": rebuilt, "failed": failed, "errors": errors}
135
+
136
+ @staticmethod
137
+ async def list_cache(
138
+ filters: Dict[str, Any],
139
+ skip: int,
140
+ limit: int,
141
+ projection_list: Optional[List[str]],
142
+ ) -> List[Any]:
143
+ """List KPI cache documents with optional MongoDB projection."""
144
+ db = get_database()
145
+ collection = db[KPI_CACHE_COLLECTION]
146
+
147
+ query: Dict[str, Any] = {}
148
+ for field in ("merchant_id", "widget_id", "period_window", "branch_id"):
149
+ if filters.get(field):
150
+ query[field] = filters[field]
151
+ if filters.get("computed_at_from") or filters.get("computed_at_to"):
152
+ date_filter: Dict[str, Any] = {}
153
+ if filters.get("computed_at_from"):
154
+ date_filter["$gte"] = filters["computed_at_from"]
155
+ if filters.get("computed_at_to"):
156
+ date_filter["$lte"] = filters["computed_at_to"]
157
+ query["computed_at"] = date_filter
158
+
159
+ projection_dict = None
160
+ if projection_list:
161
+ projection_dict = {f: 1 for f in projection_list}
162
+ projection_dict["_id"] = 0
163
+
164
+ cursor = collection.find(query, projection_dict).skip(skip).limit(limit)
165
+ return await cursor.to_list(length=limit)
166
+
167
+ # ------------------------------------------------------------------
168
+ # Internal helpers
169
+ # ------------------------------------------------------------------
170
+
171
+ @staticmethod
172
+ def _to_schema(widget_id: str, result: KPIResult) -> KPIResultSchema:
173
+ meta = KPI_WIDGET_REGISTRY.get(widget_id, {})
174
+ return KPIResultSchema(
175
+ widget_id=widget_id,
176
+ title=meta.get("title", widget_id),
177
+ category=meta.get("category", ""),
178
+ value=max(result.value, 0),
179
+ unit=result.unit,
180
+ delta=result.delta,
181
+ delta_percentage=result.delta_percentage,
182
+ trend=result.trend,
183
+ secondary_values=result.secondary_values,
184
+ drill_down_url=result.drill_down_url or meta.get("drill_down_url"),
185
+ cached=result.cached,
186
+ computed_at=result.computed_at,
187
+ error=result.error,
188
+ )
189
+
190
+ @staticmethod
191
+ async def _write_to_mongo(
192
+ merchant_id: str,
193
+ widget_id: str,
194
+ period_window: str,
195
+ branch_id: str,
196
+ result: KPIResult,
197
+ ) -> None:
198
+ try:
199
+ db = get_database()
200
+ collection = db[KPI_CACHE_COLLECTION]
201
+ now = datetime.now(timezone.utc)
202
+ ttl_seconds = _ttl_for(widget_id)
203
+ meta = KPI_WIDGET_REGISTRY.get(widget_id, {})
204
+ doc = {
205
+ "cache_id": str(uuid.uuid4()),
206
+ "merchant_id": merchant_id,
207
+ "widget_id": widget_id,
208
+ "period_window": period_window,
209
+ "branch_id": branch_id,
210
+ "value": max(result.value, 0),
211
+ "unit": result.unit,
212
+ "delta": result.delta,
213
+ "delta_percentage": result.delta_percentage,
214
+ "trend": result.trend,
215
+ "secondary_values": result.secondary_values,
216
+ "drill_down_url": result.drill_down_url or meta.get("drill_down_url"),
217
+ "computed_at": now,
218
+ "expires_at": now + timedelta(seconds=ttl_seconds),
219
+ "cached": False,
220
+ "error": result.error,
221
+ }
222
+ await collection.update_one(
223
+ {"merchant_id": merchant_id, "widget_id": widget_id,
224
+ "period_window": period_window, "branch_id": branch_id},
225
+ {"$set": doc},
226
+ upsert=True,
227
+ )
228
+ except Exception as exc:
229
+ logger.error(
230
+ "MongoDB write failed",
231
+ extra={"event": "mongo_write_error", "error": str(exc)},
232
+ )
app/kpi_cache/services/sql_queries.py ADDED
@@ -0,0 +1,303 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Raw asyncpg SQL queries for each KPI widget.
3
+ All queries are scoped to merchant_id as the first predicate.
4
+ Returns a dict matching KPICacheDocument fields.
5
+ """
6
+ from datetime import datetime, timedelta, timezone
7
+ from typing import Optional, Dict, Any
8
+
9
+ from app.core.logging import get_logger
10
+
11
+ logger = get_logger(__name__)
12
+
13
+
14
+ def _utc_today_start() -> datetime:
15
+ now = datetime.now(timezone.utc)
16
+ return now.replace(hour=0, minute=0, second=0, microsecond=0)
17
+
18
+
19
+ def _compute_delta(current: float, prior: float):
20
+ delta = current - prior
21
+ delta_pct = round((delta / prior) * 100, 2) if prior != 0 else None
22
+ trend = "up" if delta > 0 else ("down" if delta < 0 else "neutral")
23
+ return delta, delta_pct, trend
24
+
25
+
26
+ # ---------------------------------------------------------------------------
27
+ # wid_open_po_count_001
28
+ # ---------------------------------------------------------------------------
29
+ async def query_open_po_count(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
30
+ branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else ""
31
+ params = [merchant_id] + ([branch_id] if branch_id else [])
32
+ row = await conn.fetchrow(f"""
33
+ SELECT
34
+ COUNT(*) FILTER (WHERE po_date >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count,
35
+ COUNT(*) FILTER (WHERE po_date < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count
36
+ FROM trans.scm_po
37
+ WHERE merchant_id = $1
38
+ AND status IN ('submitted','approved','dispatched','partially_received')
39
+ {branch_clause}
40
+ """, *params)
41
+ current = float(row["current_count"] or 0)
42
+ prior = float(row["prior_count"] or 0)
43
+ delta, delta_pct, trend = _compute_delta(current, prior)
44
+ return {"value": max(current, 0), "unit": "count", "delta": delta,
45
+ "delta_percentage": delta_pct, "trend": trend}
46
+
47
+
48
+ # ---------------------------------------------------------------------------
49
+ # wid_po_aging_001
50
+ # ---------------------------------------------------------------------------
51
+ async def query_po_aging(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
52
+ branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else ""
53
+ params = [merchant_id] + ([branch_id] if branch_id else [])
54
+ row = await conn.fetchrow(f"""
55
+ SELECT
56
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7) AS b0_7,
57
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14) AS b8_14,
58
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30) AS b15_30,
59
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date > 30) AS b30_plus,
60
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7), 0) AS v0_7,
61
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14), 0) AS v8_14,
62
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30), 0) AS v15_30,
63
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date > 30), 0) AS v30_plus
64
+ FROM trans.scm_po
65
+ WHERE merchant_id = $1
66
+ AND status IN ('submitted','approved','dispatched','partially_received')
67
+ {branch_clause}
68
+ """, *params)
69
+ b = {k: int(row[k] or 0) for k in ("b0_7", "b8_14", "b15_30", "b30_plus")}
70
+ total = sum(b.values())
71
+ secondary = {
72
+ "bucket_0_7": {"count": b["b0_7"], "value": float(row["v0_7"] or 0)},
73
+ "bucket_8_14": {"count": b["b8_14"], "value": float(row["v8_14"] or 0)},
74
+ "bucket_15_30": {"count": b["b15_30"], "value": float(row["v15_30"] or 0)},
75
+ "bucket_30_plus":{"count": b["b30_plus"],"value": float(row["v30_plus"] or 0)},
76
+ }
77
+ return {"value": float(total), "unit": "count", "secondary_values": secondary}
78
+
79
+
80
+ # ---------------------------------------------------------------------------
81
+ # wid_receipts_this_week_001
82
+ # ---------------------------------------------------------------------------
83
+ async def query_receipts_this_week(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
84
+ branch_clause = "AND wh_location = $2" if branch_id else ""
85
+ params = [merchant_id] + ([branch_id] if branch_id else [])
86
+ row = await conn.fetchrow(f"""
87
+ SELECT
88
+ COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count,
89
+ COALESCE(SUM(total_qty) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days'), 0) AS total_qty,
90
+ COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '14 days'
91
+ AND recv_dt < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count
92
+ FROM trans.scm_grn
93
+ WHERE merchant_id = $1
94
+ {branch_clause}
95
+ """, *params)
96
+ current = float(row["current_count"] or 0)
97
+ prior = float(row["prior_count"] or 0)
98
+ delta, delta_pct, trend = _compute_delta(current, prior)
99
+ return {"value": max(current, 0), "unit": "count", "delta": delta,
100
+ "delta_percentage": delta_pct, "trend": trend,
101
+ "secondary_values": {"total_received_qty": float(row["total_qty"] or 0)}}
102
+
103
+
104
+ # ---------------------------------------------------------------------------
105
+ # wid_stock_ins_today_001 (MongoDB Self-GRN — stub returning 0 until cross-service)
106
+ # ---------------------------------------------------------------------------
107
+ async def query_stock_ins_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
108
+ # Self-GRN lives in MongoDB managed by SCM-ms; analytics-ms returns a stub
109
+ # until a shared collection or internal API is available.
110
+ logger.warning("stock_ins_today: MongoDB Self-GRN not accessible from analytics-ms; returning stub",
111
+ extra={"event": "kpi_stub", "widget_id": "wid_stock_ins_today_001",
112
+ "merchant_id": merchant_id})
113
+ return {"value": 0.0, "unit": "count", "delta": 0.0, "delta_percentage": None,
114
+ "trend": "neutral", "secondary_values": {"stub": True}}
115
+
116
+
117
+ # ---------------------------------------------------------------------------
118
+ # wid_low_stock_skus_001
119
+ # ---------------------------------------------------------------------------
120
+ async def query_low_stock_skus(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
121
+ branch_clause = "AND warehouse_id = $2" if branch_id else ""
122
+ params = [merchant_id] + ([branch_id] if branch_id else [])
123
+ row = await conn.fetchrow(f"""
124
+ SELECT
125
+ COUNT(DISTINCT sku) FILTER (WHERE qty_available = 0) AS stockout_count,
126
+ COUNT(DISTINCT sku) FILTER (WHERE qty_available <= 0) AS low_stock_count
127
+ FROM trans.scm_stock
128
+ WHERE merchant_id = $1
129
+ {branch_clause}
130
+ """, *params)
131
+ stockout = int(row["stockout_count"] or 0)
132
+ low = int(row["low_stock_count"] or 0)
133
+ logger.warning("low_stock_skus: reorder_point not on scm_stock; using qty_available=0 fallback",
134
+ extra={"event": "kpi_fallback", "merchant_id": merchant_id})
135
+ return {"value": float(low), "unit": "count",
136
+ "secondary_values": {"stockout_count": stockout, "low_stock_count": low}}
137
+
138
+
139
+ # ---------------------------------------------------------------------------
140
+ # wid_net_stock_value_001
141
+ # ---------------------------------------------------------------------------
142
+ async def query_net_stock_value(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
143
+ branch_clause = "AND warehouse_id = $2" if branch_id else ""
144
+ params = [merchant_id] + ([branch_id] if branch_id else [])
145
+ row = await conn.fetchrow(f"""
146
+ SELECT COALESCE(SUM(qty_on_hand * cost_price), 0) AS net_value
147
+ FROM trans.scm_stock
148
+ WHERE merchant_id = $1
149
+ AND cost_price IS NOT NULL
150
+ {branch_clause}
151
+ """, *params)
152
+ value = float(row["net_value"] or 0)
153
+ if value < 0:
154
+ logger.warning("net_stock_value: negative aggregate clamped to 0",
155
+ extra={"event": "kpi_clamp", "raw": value, "merchant_id": merchant_id})
156
+ value = 0.0
157
+ return {"value": value, "unit": "INR", "delta": None, "delta_percentage": None, "trend": "neutral"}
158
+
159
+
160
+ # ---------------------------------------------------------------------------
161
+ # wid_adjustments_mtd_001
162
+ # ---------------------------------------------------------------------------
163
+ async def query_adjustments_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
164
+ branch_clause = "AND m.warehouse_id = $2" if branch_id else ""
165
+ params = [merchant_id] + ([branch_id] if branch_id else [])
166
+ row = await conn.fetchrow(f"""
167
+ SELECT
168
+ COUNT(DISTINCT m.adjustment_master_id) AS adj_count,
169
+ COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'IN'), 0) AS pos_value,
170
+ COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'OUT'), 0) AS neg_value
171
+ FROM trans.scm_stock_adjustment_master m
172
+ JOIN trans.scm_stock_adjustment_details d
173
+ ON d.adjustment_master_id = m.adjustment_master_id
174
+ WHERE m.merchant_id = $1
175
+ AND m.status IN ('approved','applied')
176
+ AND m.adjustment_date >= DATE_TRUNC('month', CURRENT_TIMESTAMP)
177
+ {branch_clause}
178
+ """, *params)
179
+ pos = float(row["pos_value"] or 0)
180
+ neg = float(row["neg_value"] or 0)
181
+ return {"value": float(row["adj_count"] or 0), "unit": "count",
182
+ "secondary_values": {"positive_value": pos, "negative_value": neg,
183
+ "net_impact": pos - neg}}
184
+
185
+
186
+ # ---------------------------------------------------------------------------
187
+ # wid_stock_take_pending_001
188
+ # ---------------------------------------------------------------------------
189
+ async def query_stock_take_pending(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
190
+ branch_clause = "AND warehouse_id = $2" if branch_id else ""
191
+ params = [merchant_id] + ([branch_id] if branch_id else [])
192
+ row = await conn.fetchrow(f"""
193
+ SELECT
194
+ COUNT(*) FILTER (WHERE status IN ('draft','in_progress')) AS pending_count,
195
+ COUNT(*) FILTER (WHERE status IN ('draft','in_progress')
196
+ AND stock_take_date < CURRENT_TIMESTAMP) AS overdue_count
197
+ FROM trans.scm_stock_take_master
198
+ WHERE merchant_id = $1
199
+ {branch_clause}
200
+ """, *params)
201
+ pending = int(row["pending_count"] or 0)
202
+ overdue = int(row["overdue_count"] or 0)
203
+ return {"value": float(pending), "unit": "count",
204
+ "secondary_values": {"pending_count": pending, "overdue_count": overdue}}
205
+
206
+
207
+ # ---------------------------------------------------------------------------
208
+ # wid_shipments_transit_001
209
+ # ---------------------------------------------------------------------------
210
+ async def query_shipments_transit(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
211
+ row = await conn.fetchrow("""
212
+ SELECT
213
+ COUNT(*) AS in_transit,
214
+ COUNT(*) FILTER (WHERE po.exp_delivery_dt < CURRENT_DATE) AS delayed
215
+ FROM trans.scm_trade_shipment ts
216
+ JOIN trans.scm_po po ON ts.order_id = po.po_id
217
+ WHERE ts.supplier_id = $1
218
+ AND ts.status = 'shipped'
219
+ """, merchant_id)
220
+ in_transit = int(row["in_transit"] or 0)
221
+ delayed = int(row["delayed"] or 0)
222
+ delayed_pct = round((delayed / in_transit) * 100, 2) if in_transit > 0 else 0.0
223
+ return {"value": float(in_transit), "unit": "count",
224
+ "secondary_values": {"in_transit_count": in_transit, "delayed_count": delayed,
225
+ "delayed_percentage": delayed_pct}}
226
+
227
+
228
+ # ---------------------------------------------------------------------------
229
+ # wid_invoices_mtd_001
230
+ # ---------------------------------------------------------------------------
231
+ async def query_invoices_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
232
+ row = await conn.fetchrow("""
233
+ SELECT
234
+ COUNT(*) AS inv_count,
235
+ COALESCE(SUM(grand_total_amt), 0) AS total_amt,
236
+ COUNT(*) FILTER (
237
+ WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month')
238
+ AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP)
239
+ ) AS prior_count,
240
+ COALESCE(SUM(grand_total_amt) FILTER (
241
+ WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month')
242
+ AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP)
243
+ ), 0) AS prior_amt
244
+ FROM trans.scm_invoice
245
+ WHERE (buyer_id = $1 OR supplier_id = $1)
246
+ AND status NOT IN ('cancelled','draft')
247
+ AND created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP)
248
+ """, merchant_id)
249
+ current = float(row["inv_count"] or 0)
250
+ total = float(row["total_amt"] or 0)
251
+ if total < 0:
252
+ logger.warning("invoices_mtd: negative total clamped",
253
+ extra={"event": "kpi_clamp", "raw": total, "merchant_id": merchant_id})
254
+ total = 0.0
255
+ prior = float(row["prior_count"] or 0)
256
+ delta, delta_pct, trend = _compute_delta(current, prior)
257
+ return {"value": max(current, 0), "unit": "count", "delta": delta,
258
+ "delta_percentage": delta_pct, "trend": trend,
259
+ "secondary_values": {"total_invoiced_amount": total}}
260
+
261
+
262
+ # ---------------------------------------------------------------------------
263
+ # wid_credit_debit_notes_mtd_001 (Phase 1 stub)
264
+ # ---------------------------------------------------------------------------
265
+ async def query_credit_debit_notes_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
266
+ logger.warning("credit_debit_notes_mtd: no dedicated table yet; returning stub",
267
+ extra={"event": "kpi_stub", "widget_id": "wid_credit_debit_notes_mtd_001",
268
+ "merchant_id": merchant_id})
269
+ return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None,
270
+ "trend": "neutral",
271
+ "secondary_values": {"credit_note_total": 0.0, "debit_note_total": 0.0,
272
+ "net_impact": 0.0, "stub": True}}
273
+
274
+
275
+ # ---------------------------------------------------------------------------
276
+ # wid_pos_sales_today_001 (Phase 2 stub)
277
+ # ---------------------------------------------------------------------------
278
+ async def query_pos_sales_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
279
+ logger.warning("pos_sales_today: Phase 2 stub",
280
+ extra={"event": "kpi_stub", "widget_id": "wid_pos_sales_today_001",
281
+ "merchant_id": merchant_id})
282
+ return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None,
283
+ "trend": "neutral",
284
+ "secondary_values": {"transaction_count": 0, "average_ticket": 0.0, "stub": True}}
285
+
286
+
287
+ # ---------------------------------------------------------------------------
288
+ # Dispatch table
289
+ # ---------------------------------------------------------------------------
290
+ QUERY_DISPATCH = {
291
+ "wid_open_po_count_001": query_open_po_count,
292
+ "wid_po_aging_001": query_po_aging,
293
+ "wid_receipts_this_week_001": query_receipts_this_week,
294
+ "wid_stock_ins_today_001": query_stock_ins_today,
295
+ "wid_low_stock_skus_001": query_low_stock_skus,
296
+ "wid_net_stock_value_001": query_net_stock_value,
297
+ "wid_adjustments_mtd_001": query_adjustments_mtd,
298
+ "wid_stock_take_pending_001": query_stock_take_pending,
299
+ "wid_shipments_transit_001": query_shipments_transit,
300
+ "wid_invoices_mtd_001": query_invoices_mtd,
301
+ "wid_credit_debit_notes_mtd_001": query_credit_debit_notes_mtd,
302
+ "wid_pos_sales_today_001": query_pos_sales_today,
303
+ }
app/kpi_handlers/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from app.kpi_handlers.registry import KPI_HANDLER_REGISTRY
2
+
3
+ __all__ = ["KPI_HANDLER_REGISTRY"]
app/kpi_handlers/adjustment_handlers.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Adjustment KPI handler: AdjustmentsMTDHandler."""
2
+ from typing import Optional
3
+
4
+ from app.kpi_handlers.base_handler import KPIHandler, KPIResult
5
+ from app.postgres import get_postgres_pool
6
+
7
+
8
+ class AdjustmentsMTDHandler(KPIHandler):
9
+ widget_id = "wid_adjustments_mtd_001"
10
+ drill_down_url = "/inventory/adjustments"
11
+ cache_ttl = 600
12
+
13
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
14
+ branch_clause = "AND m.warehouse_id = $2" if branch_id else ""
15
+ params = [merchant_id] + ([branch_id] if branch_id else [])
16
+ pool = get_postgres_pool()
17
+ async with pool.acquire() as conn:
18
+ row = await conn.fetchrow(f"""
19
+ SELECT
20
+ COUNT(DISTINCT m.adjustment_master_id) AS adj_count,
21
+ COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'IN'), 0) AS pos_value,
22
+ COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'OUT'), 0) AS neg_value
23
+ FROM trans.scm_stock_adjustment_master m
24
+ JOIN trans.scm_stock_adjustment_details d
25
+ ON d.adjustment_master_id = m.adjustment_master_id
26
+ WHERE m.merchant_id = $1
27
+ AND m.status IN ('approved','applied')
28
+ AND m.adjustment_date >= DATE_TRUNC('month', CURRENT_TIMESTAMP)
29
+ {branch_clause}
30
+ """, *params)
31
+ pos = float(row["pos_value"] or 0)
32
+ neg = float(row["neg_value"] or 0)
33
+ return KPIResult(
34
+ value=float(row["adj_count"] or 0),
35
+ unit="count",
36
+ secondary_values={
37
+ "positive_value": pos,
38
+ "negative_value": neg,
39
+ "net_impact": pos - neg,
40
+ },
41
+ drill_down_url=self.drill_down_url,
42
+ )
app/kpi_handlers/base_handler.py ADDED
@@ -0,0 +1,158 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Base abstractions for KPI handlers.
3
+ Each KPI is a self-contained KPIHandler subclass that implements compute().
4
+ """
5
+ import json
6
+ from abc import ABC, abstractmethod
7
+ from datetime import datetime, timezone
8
+ from typing import Optional, Tuple, Dict, Any
9
+
10
+ from pydantic import BaseModel
11
+
12
+ from app.cache import get_redis
13
+ from app.core.logging import get_logger
14
+ from app.postgres import get_postgres_pool
15
+
16
+ logger = get_logger(__name__)
17
+
18
+
19
+ class KPIResult(BaseModel):
20
+ value: float
21
+ unit: str = "count"
22
+ delta: Optional[float] = None
23
+ delta_percentage: Optional[float] = None
24
+ trend: str = "neutral"
25
+ secondary_values: Optional[Dict[str, Any]] = None
26
+ drill_down_url: Optional[str] = None
27
+ cached: bool = False
28
+ computed_at: Optional[datetime] = None
29
+ error: Optional[str] = None
30
+
31
+
32
+ def get_period_filter(period_window: str, column: str = "created_at") -> str:
33
+ """Return a SQL fragment for the given period window."""
34
+ mapping = {
35
+ "today": f"{column}::date = CURRENT_DATE",
36
+ "last_7_days": f"{column} >= CURRENT_TIMESTAMP - INTERVAL '7 days'",
37
+ "mtd": f"{column} >= DATE_TRUNC('month', CURRENT_TIMESTAMP)",
38
+ "ytd": f"{column} >= DATE_TRUNC('year', CURRENT_TIMESTAMP)",
39
+ "last_12_months": f"{column} >= CURRENT_TIMESTAMP - INTERVAL '12 months'",
40
+ }
41
+ return mapping.get(period_window, mapping["mtd"])
42
+
43
+
44
+ def compute_delta(current: float, prior: float) -> Tuple[float, Optional[float], str]:
45
+ """
46
+ Returns (delta, delta_pct, trend).
47
+ delta_pct is None when prior == 0.
48
+ """
49
+ delta = current - prior
50
+ delta_pct = round((delta / prior) * 100, 2) if prior != 0 else None
51
+ trend = "up" if delta > 0 else ("down" if delta < 0 else "neutral")
52
+ return delta, delta_pct, trend
53
+
54
+
55
+ class KPIHandler(ABC):
56
+ """Abstract base for all KPI handlers."""
57
+
58
+ widget_id: str
59
+ drill_down_url: Optional[str] = None
60
+ cache_ttl: int = 600 # seconds
61
+
62
+ async def compute(
63
+ self,
64
+ merchant_id: str,
65
+ period_window: str,
66
+ branch_id: Optional[str],
67
+ use_cache: bool = True,
68
+ ) -> KPIResult:
69
+ """
70
+ Cache check → DB query → cache write.
71
+ Redis unavailability falls back silently to DB.
72
+ """
73
+ import time
74
+ start = time.monotonic()
75
+ cache_key = self._cache_key(merchant_id, period_window, branch_id)
76
+
77
+ if use_cache:
78
+ cached_result = await self._read_cache(cache_key)
79
+ if cached_result is not None:
80
+ duration_ms = round((time.monotonic() - start) * 1000, 2)
81
+ logger.info(
82
+ "KPI served from cache",
83
+ extra={
84
+ "event": "kpi_computed",
85
+ "widget_id": self.widget_id,
86
+ "merchant_id": merchant_id,
87
+ "cached": True,
88
+ "duration_ms": duration_ms,
89
+ },
90
+ )
91
+ cached_result.cached = True
92
+ return cached_result
93
+
94
+ result = await self._execute(merchant_id, period_window, branch_id)
95
+ result.cached = False
96
+ result.computed_at = datetime.now(timezone.utc)
97
+ if result.drill_down_url is None:
98
+ result.drill_down_url = self.drill_down_url
99
+
100
+ await self._write_cache(cache_key, result)
101
+
102
+ duration_ms = round((time.monotonic() - start) * 1000, 2)
103
+ logger.info(
104
+ "KPI computed",
105
+ extra={
106
+ "event": "kpi_computed",
107
+ "widget_id": self.widget_id,
108
+ "merchant_id": merchant_id,
109
+ "cached": False,
110
+ "duration_ms": duration_ms,
111
+ },
112
+ )
113
+ return result
114
+
115
+ @abstractmethod
116
+ async def _execute(
117
+ self,
118
+ merchant_id: str,
119
+ period_window: str,
120
+ branch_id: Optional[str],
121
+ ) -> KPIResult:
122
+ """Run the actual DB query and return a KPIResult."""
123
+ ...
124
+
125
+ def _cache_key(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> str:
126
+ br = branch_id or "all"
127
+ return f"widget:{self.widget_id}:{merchant_id}:{period_window}:{br}"
128
+
129
+ async def _read_cache(self, key: str) -> Optional[KPIResult]:
130
+ redis = get_redis()
131
+ if not redis:
132
+ return None
133
+ try:
134
+ raw = await redis.get(key)
135
+ if raw:
136
+ return KPIResult(**json.loads(raw))
137
+ except Exception as exc:
138
+ logger.warning(
139
+ "widget_cache_unavailable",
140
+ extra={"event": "widget_cache_unavailable", "error": str(exc)},
141
+ )
142
+ return None
143
+
144
+ async def _write_cache(self, key: str, result: KPIResult) -> None:
145
+ redis = get_redis()
146
+ if not redis:
147
+ return
148
+ try:
149
+ await redis.setex(key, self.cache_ttl, result.model_dump_json())
150
+ except Exception as exc:
151
+ logger.warning(
152
+ "widget_cache_unavailable",
153
+ extra={"event": "widget_cache_unavailable", "error": str(exc)},
154
+ )
155
+
156
+ async def _get_conn(self):
157
+ """Acquire a connection from the asyncpg pool."""
158
+ return get_postgres_pool().acquire()
app/kpi_handlers/finance_handlers.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Finance KPI handlers: InvoicesMTDHandler, CreditDebitNotesMTDHandler."""
2
+ from typing import Optional
3
+
4
+ from app.core.logging import get_logger
5
+ from app.kpi_handlers.base_handler import KPIHandler, KPIResult, compute_delta
6
+ from app.postgres import get_postgres_pool
7
+
8
+ logger = get_logger(__name__)
9
+
10
+
11
+ class InvoicesMTDHandler(KPIHandler):
12
+ widget_id = "wid_invoices_mtd_001"
13
+ drill_down_url = "/trade-sales/invoices"
14
+ cache_ttl = 1800 # CACHE_TTL_FINANCIAL
15
+
16
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
17
+ pool = get_postgres_pool()
18
+ async with pool.acquire() as conn:
19
+ row = await conn.fetchrow("""
20
+ SELECT
21
+ COUNT(*) FILTER (
22
+ WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP)
23
+ ) AS inv_count,
24
+ COALESCE(SUM(grand_total_amt) FILTER (
25
+ WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP)
26
+ ), 0) AS total_amt,
27
+ COUNT(*) FILTER (
28
+ WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month')
29
+ AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP)
30
+ ) AS prior_count,
31
+ COALESCE(SUM(grand_total_amt) FILTER (
32
+ WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month')
33
+ AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP)
34
+ ), 0) AS prior_amt
35
+ FROM trans.scm_invoice
36
+ WHERE (buyer_id = $1 OR supplier_id = $1)
37
+ AND status NOT IN ('cancelled','draft')
38
+ """, merchant_id)
39
+ current = float(row["inv_count"] or 0)
40
+ total = float(row["total_amt"] or 0)
41
+ if total < 0:
42
+ logger.warning(
43
+ "invoices_mtd: negative total clamped to 0",
44
+ extra={"event": "kpi_clamp", "raw": total, "merchant_id": merchant_id,
45
+ "widget_id": self.widget_id},
46
+ )
47
+ total = 0.0
48
+ prior = float(row["prior_count"] or 0)
49
+ delta, delta_pct, trend = compute_delta(current, prior)
50
+ return KPIResult(
51
+ value=max(current, 0),
52
+ unit="count",
53
+ delta=delta,
54
+ delta_percentage=delta_pct,
55
+ trend=trend,
56
+ secondary_values={"total_invoiced_amount": total},
57
+ drill_down_url=self.drill_down_url,
58
+ )
59
+
60
+
61
+ class CreditDebitNotesMTDHandler(KPIHandler):
62
+ """Phase 1 stub — no dedicated table yet."""
63
+ widget_id = "wid_credit_debit_notes_mtd_001"
64
+ drill_down_url = "/trade-sales/credit-debit-notes"
65
+ cache_ttl = 1800 # CACHE_TTL_FINANCIAL
66
+
67
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
68
+ logger.warning(
69
+ "credit_debit_notes_mtd: no dedicated table yet; returning stub",
70
+ extra={"event": "widget_credit_debit_stub", "widget_id": self.widget_id,
71
+ "merchant_id": merchant_id},
72
+ )
73
+ return KPIResult(
74
+ value=0.0,
75
+ unit="INR",
76
+ delta=0.0,
77
+ delta_percentage=None,
78
+ trend="neutral",
79
+ secondary_values={"credit_note_total": 0.0, "debit_note_total": 0.0,
80
+ "net_impact": 0.0, "stub": True},
81
+ drill_down_url=self.drill_down_url,
82
+ )
app/kpi_handlers/grn_handlers.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """GRN KPI handler: ReceiptsThisWeekHandler."""
2
+ from typing import Optional
3
+
4
+ from app.kpi_handlers.base_handler import KPIHandler, KPIResult, compute_delta
5
+ from app.postgres import get_postgres_pool
6
+
7
+
8
+ class ReceiptsThisWeekHandler(KPIHandler):
9
+ widget_id = "wid_receipts_this_week_001"
10
+ drill_down_url = "/purchases/receipts"
11
+ cache_ttl = 600
12
+
13
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
14
+ branch_clause = "AND wh_location = $2" if branch_id else ""
15
+ params = [merchant_id] + ([branch_id] if branch_id else [])
16
+ pool = get_postgres_pool()
17
+ async with pool.acquire() as conn:
18
+ row = await conn.fetchrow(f"""
19
+ SELECT
20
+ COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count,
21
+ COALESCE(SUM(total_qty) FILTER (
22
+ WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days'), 0) AS total_qty,
23
+ COUNT(*) FILTER (
24
+ WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '14 days'
25
+ AND recv_dt < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count
26
+ FROM trans.scm_grn
27
+ WHERE merchant_id = $1
28
+ {branch_clause}
29
+ """, *params)
30
+ current = float(row["current_count"] or 0)
31
+ prior = float(row["prior_count"] or 0)
32
+ delta, delta_pct, trend = compute_delta(current, prior)
33
+ return KPIResult(
34
+ value=max(current, 0),
35
+ unit="count",
36
+ delta=delta,
37
+ delta_percentage=delta_pct,
38
+ trend=trend,
39
+ secondary_values={"total_received_qty": float(row["total_qty"] or 0)},
40
+ drill_down_url=self.drill_down_url,
41
+ )
app/kpi_handlers/inventory_handlers.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Inventory KPI handlers: LowStockSKUsHandler, NetStockValueHandler."""
2
+ from typing import Optional
3
+
4
+ from app.core.logging import get_logger
5
+ from app.kpi_handlers.base_handler import KPIHandler, KPIResult
6
+ from app.postgres import get_postgres_pool
7
+
8
+ logger = get_logger(__name__)
9
+
10
+
11
+ class LowStockSKUsHandler(KPIHandler):
12
+ widget_id = "wid_low_stock_skus_001"
13
+ drill_down_url = "/inventory/stock-overview"
14
+ cache_ttl = 600
15
+
16
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
17
+ branch_clause = "AND warehouse_id = $2" if branch_id else ""
18
+ params = [merchant_id] + ([branch_id] if branch_id else [])
19
+ pool = get_postgres_pool()
20
+ async with pool.acquire() as conn:
21
+ row = await conn.fetchrow(f"""
22
+ SELECT
23
+ COUNT(DISTINCT sku) FILTER (WHERE qty_available = 0) AS stockout_count,
24
+ COUNT(DISTINCT sku) FILTER (WHERE qty_available <= 0) AS low_stock_count
25
+ FROM trans.scm_stock
26
+ WHERE merchant_id = $1
27
+ {branch_clause}
28
+ """, *params)
29
+ stockout = int(row["stockout_count"] or 0)
30
+ low = int(row["low_stock_count"] or 0)
31
+ # reorder_point not on scm_stock — using qty_available=0 fallback
32
+ logger.warning(
33
+ "low_stock_skus: reorder_point not on scm_stock; using qty_available=0 fallback",
34
+ extra={"event": "kpi_fallback", "widget_id": self.widget_id, "merchant_id": merchant_id},
35
+ )
36
+ return KPIResult(
37
+ value=float(low),
38
+ unit="count",
39
+ secondary_values={"stockout_count": stockout, "low_stock_count": low},
40
+ drill_down_url=self.drill_down_url,
41
+ )
42
+
43
+
44
+ class NetStockValueHandler(KPIHandler):
45
+ widget_id = "wid_net_stock_value_001"
46
+ drill_down_url = "/inventory/stock-overview"
47
+ cache_ttl = 1800 # CACHE_TTL_FINANCIAL
48
+
49
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
50
+ branch_clause = "AND warehouse_id = $2" if branch_id else ""
51
+ params = [merchant_id] + ([branch_id] if branch_id else [])
52
+ pool = get_postgres_pool()
53
+ async with pool.acquire() as conn:
54
+ row = await conn.fetchrow(f"""
55
+ SELECT COALESCE(SUM(qty_on_hand * cost_price), 0) AS net_value
56
+ FROM trans.scm_stock
57
+ WHERE merchant_id = $1
58
+ AND cost_price IS NOT NULL
59
+ {branch_clause}
60
+ """, *params)
61
+ value = float(row["net_value"] or 0)
62
+ if value < 0:
63
+ logger.warning(
64
+ "net_stock_value: negative aggregate clamped to 0",
65
+ extra={"event": "kpi_clamp", "raw": value, "merchant_id": merchant_id,
66
+ "widget_id": self.widget_id},
67
+ )
68
+ value = 0.0
69
+ # 7-day delta via ledger is best-effort; return null if insufficient data
70
+ return KPIResult(
71
+ value=value,
72
+ unit="INR",
73
+ delta=None,
74
+ delta_percentage=None,
75
+ trend="neutral",
76
+ drill_down_url=self.drill_down_url,
77
+ )
app/kpi_handlers/po_handlers.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """PO-related KPI handlers: OpenPOCountHandler, POAgingHandler."""
2
+ from typing import Optional
3
+
4
+ from app.kpi_handlers.base_handler import KPIHandler, KPIResult, compute_delta
5
+ from app.postgres import get_postgres_pool
6
+
7
+
8
+ class OpenPOCountHandler(KPIHandler):
9
+ widget_id = "wid_open_po_count_001"
10
+ drill_down_url = "/purchases/orders"
11
+ cache_ttl = 600
12
+
13
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
14
+ branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else ""
15
+ params = [merchant_id] + ([branch_id] if branch_id else [])
16
+ pool = get_postgres_pool()
17
+ async with pool.acquire() as conn:
18
+ row = await conn.fetchrow(f"""
19
+ SELECT
20
+ COUNT(*) FILTER (WHERE po_date >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count,
21
+ COUNT(*) FILTER (WHERE po_date < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count
22
+ FROM trans.scm_po
23
+ WHERE merchant_id = $1
24
+ AND status IN ('submitted','approved','dispatched','partially_received')
25
+ {branch_clause}
26
+ """, *params)
27
+ current = float(row["current_count"] or 0)
28
+ prior = float(row["prior_count"] or 0)
29
+ delta, delta_pct, trend = compute_delta(current, prior)
30
+ return KPIResult(
31
+ value=max(current, 0),
32
+ unit="count",
33
+ delta=delta,
34
+ delta_percentage=delta_pct,
35
+ trend=trend,
36
+ drill_down_url=self.drill_down_url,
37
+ )
38
+
39
+
40
+ class POAgingHandler(KPIHandler):
41
+ widget_id = "wid_po_aging_001"
42
+ drill_down_url = "/purchases/orders"
43
+ cache_ttl = 600
44
+
45
+ async def _execute(self, merchant_id: str, period_window: str, branch_id: Optional[str]) -> KPIResult:
46
+ branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else ""
47
+ params = [merchant_id] + ([branch_id] if branch_id else [])
48
+ pool = get_postgres_pool()
49
+ async with pool.acquire() as conn:
50
+ row = await conn.fetchrow(f"""
51
+ SELECT
52
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7) AS b0_7,
53
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14) AS b8_14,
54
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30) AS b15_30,
55
+ COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date > 30) AS b30_plus,
56
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7), 0) AS v0_7,
57
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14), 0) AS v8_14,
58
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30), 0) AS v15_30,
59
+ COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date > 30), 0) AS v30_plus
60
+ FROM trans.scm_po
61
+ WHERE merchant_id = $1
62
+ AND status IN ('submitted','approved','dispatched','partially_received')
63
+ {branch_clause}
64
+ """, *params)
65
+ b = {k: int(row[k] or 0) for k in ("b0_7", "b8_14", "b15_30", "b30_plus")}
66
+ total = sum(b.values())
67
+ secondary = {
68
+ "bucket_0_7": {"count": b["b0_7"], "value": float(row["v0_7"] or 0)},
69
+ "bucket_8_14": {"count": b["b8_14"], "value": float(row["v8_14"] or 0)},
70
+ "bucket_15_30": {"count": b["b15_30"], "value": float(row["v15_30"] or 0)},
71
+ "bucket_30_plus":{"count": b["b30_plus"],"value": float(row["v30_plus"] or 0)},
72
+ }
73
+ return KPIResult(
74
+ value=float(total),
75
+ unit="count",
76
+ secondary_values=secondary,
77
+ drill_down_url=self.drill_down_url,
78
+ )