MukeshKapoor25's picture
Initial commit
b143975
"""
Raw asyncpg SQL queries for each KPI widget.
All queries are scoped to merchant_id as the first predicate.
Returns a dict matching KPICacheDocument fields.
"""
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
from app.core.logging import get_logger
logger = get_logger(__name__)
def _utc_today_start() -> datetime:
now = datetime.now(timezone.utc)
return now.replace(hour=0, minute=0, second=0, microsecond=0)
def _compute_delta(current: float, prior: float):
delta = current - prior
delta_pct = round((delta / prior) * 100, 2) if prior != 0 else None
trend = "up" if delta > 0 else ("down" if delta < 0 else "neutral")
return delta, delta_pct, trend
# ---------------------------------------------------------------------------
# wid_open_po_count_001
# ---------------------------------------------------------------------------
async def query_open_po_count(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else ""
params = [merchant_id] + ([branch_id] if branch_id else [])
row = await conn.fetchrow(f"""
SELECT
COUNT(*) FILTER (WHERE po_date >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count,
COUNT(*) FILTER (WHERE po_date < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count
FROM trans.scm_po
WHERE merchant_id = $1
AND status IN ('submitted','approved','dispatched','partially_received')
{branch_clause}
""", *params)
current = float(row["current_count"] or 0)
prior = float(row["prior_count"] or 0)
delta, delta_pct, trend = _compute_delta(current, prior)
return {"value": max(current, 0), "unit": "count", "delta": delta,
"delta_percentage": delta_pct, "trend": trend}
# ---------------------------------------------------------------------------
# wid_po_aging_001
# ---------------------------------------------------------------------------
async def query_po_aging(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else ""
params = [merchant_id] + ([branch_id] if branch_id else [])
row = await conn.fetchrow(f"""
SELECT
COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7) AS b0_7,
COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14) AS b8_14,
COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30) AS b15_30,
COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date > 30) AS b30_plus,
COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7), 0) AS v0_7,
COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14), 0) AS v8_14,
COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30), 0) AS v15_30,
COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date > 30), 0) AS v30_plus
FROM trans.scm_po
WHERE merchant_id = $1
AND status IN ('submitted','approved','dispatched','partially_received')
{branch_clause}
""", *params)
b = {k: int(row[k] or 0) for k in ("b0_7", "b8_14", "b15_30", "b30_plus")}
total = sum(b.values())
secondary = {
"bucket_0_7": {"count": b["b0_7"], "value": float(row["v0_7"] or 0)},
"bucket_8_14": {"count": b["b8_14"], "value": float(row["v8_14"] or 0)},
"bucket_15_30": {"count": b["b15_30"], "value": float(row["v15_30"] or 0)},
"bucket_30_plus":{"count": b["b30_plus"],"value": float(row["v30_plus"] or 0)},
}
return {"value": float(total), "unit": "count", "secondary_values": secondary}
# ---------------------------------------------------------------------------
# wid_receipts_this_week_001
# ---------------------------------------------------------------------------
async def query_receipts_this_week(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
branch_clause = "AND wh_location = $2" if branch_id else ""
params = [merchant_id] + ([branch_id] if branch_id else [])
row = await conn.fetchrow(f"""
SELECT
COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count,
COALESCE(SUM(total_qty) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days'), 0) AS total_qty,
COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '14 days'
AND recv_dt < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count
FROM trans.scm_grn
WHERE merchant_id = $1
{branch_clause}
""", *params)
current = float(row["current_count"] or 0)
prior = float(row["prior_count"] or 0)
delta, delta_pct, trend = _compute_delta(current, prior)
return {"value": max(current, 0), "unit": "count", "delta": delta,
"delta_percentage": delta_pct, "trend": trend,
"secondary_values": {"total_received_qty": float(row["total_qty"] or 0)}}
# ---------------------------------------------------------------------------
# wid_stock_ins_today_001 (MongoDB Self-GRN — stub returning 0 until cross-service)
# ---------------------------------------------------------------------------
async def query_stock_ins_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
# Self-GRN lives in MongoDB managed by SCM-ms; analytics-ms returns a stub
# until a shared collection or internal API is available.
logger.warning("stock_ins_today: MongoDB Self-GRN not accessible from analytics-ms; returning stub",
extra={"event": "kpi_stub", "widget_id": "wid_stock_ins_today_001",
"merchant_id": merchant_id})
return {"value": 0.0, "unit": "count", "delta": 0.0, "delta_percentage": None,
"trend": "neutral", "secondary_values": {"stub": True}}
# ---------------------------------------------------------------------------
# wid_low_stock_skus_001
# ---------------------------------------------------------------------------
async def query_low_stock_skus(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
branch_clause = "AND warehouse_id = $2" if branch_id else ""
params = [merchant_id] + ([branch_id] if branch_id else [])
row = await conn.fetchrow(f"""
SELECT
COUNT(DISTINCT sku) FILTER (WHERE qty_available = 0) AS stockout_count,
COUNT(DISTINCT sku) FILTER (WHERE qty_available <= 0) AS low_stock_count
FROM trans.scm_stock
WHERE merchant_id = $1
{branch_clause}
""", *params)
stockout = int(row["stockout_count"] or 0)
low = int(row["low_stock_count"] or 0)
logger.warning("low_stock_skus: reorder_point not on scm_stock; using qty_available=0 fallback",
extra={"event": "kpi_fallback", "merchant_id": merchant_id})
return {"value": float(low), "unit": "count",
"secondary_values": {"stockout_count": stockout, "low_stock_count": low}}
# ---------------------------------------------------------------------------
# wid_net_stock_value_001
# ---------------------------------------------------------------------------
async def query_net_stock_value(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
branch_clause = "AND warehouse_id = $2" if branch_id else ""
params = [merchant_id] + ([branch_id] if branch_id else [])
row = await conn.fetchrow(f"""
SELECT COALESCE(SUM(qty_on_hand * cost_price), 0) AS net_value
FROM trans.scm_stock
WHERE merchant_id = $1
AND cost_price IS NOT NULL
{branch_clause}
""", *params)
value = float(row["net_value"] or 0)
if value < 0:
logger.warning("net_stock_value: negative aggregate clamped to 0",
extra={"event": "kpi_clamp", "raw": value, "merchant_id": merchant_id})
value = 0.0
return {"value": value, "unit": "INR", "delta": None, "delta_percentage": None, "trend": "neutral"}
# ---------------------------------------------------------------------------
# wid_adjustments_mtd_001
# ---------------------------------------------------------------------------
async def query_adjustments_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
branch_clause = "AND m.warehouse_id = $2" if branch_id else ""
params = [merchant_id] + ([branch_id] if branch_id else [])
row = await conn.fetchrow(f"""
SELECT
COUNT(DISTINCT m.adjustment_master_id) AS adj_count,
COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'IN'), 0) AS pos_value,
COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'OUT'), 0) AS neg_value
FROM trans.scm_stock_adjustment_master m
JOIN trans.scm_stock_adjustment_details d
ON d.adjustment_master_id = m.adjustment_master_id
WHERE m.merchant_id = $1
AND m.status IN ('approved','applied')
AND m.adjustment_date >= DATE_TRUNC('month', CURRENT_TIMESTAMP)
{branch_clause}
""", *params)
pos = float(row["pos_value"] or 0)
neg = float(row["neg_value"] or 0)
return {"value": float(row["adj_count"] or 0), "unit": "count",
"secondary_values": {"positive_value": pos, "negative_value": neg,
"net_impact": pos - neg}}
# ---------------------------------------------------------------------------
# wid_stock_take_pending_001
# ---------------------------------------------------------------------------
async def query_stock_take_pending(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
branch_clause = "AND warehouse_id = $2" if branch_id else ""
params = [merchant_id] + ([branch_id] if branch_id else [])
row = await conn.fetchrow(f"""
SELECT
COUNT(*) FILTER (WHERE status IN ('draft','in_progress')) AS pending_count,
COUNT(*) FILTER (WHERE status IN ('draft','in_progress')
AND stock_take_date < CURRENT_TIMESTAMP) AS overdue_count
FROM trans.scm_stock_take_master
WHERE merchant_id = $1
{branch_clause}
""", *params)
pending = int(row["pending_count"] or 0)
overdue = int(row["overdue_count"] or 0)
return {"value": float(pending), "unit": "count",
"secondary_values": {"pending_count": pending, "overdue_count": overdue}}
# ---------------------------------------------------------------------------
# wid_shipments_transit_001
# ---------------------------------------------------------------------------
async def query_shipments_transit(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
row = await conn.fetchrow("""
SELECT
COUNT(*) AS in_transit,
COUNT(*) FILTER (WHERE po.exp_delivery_dt < CURRENT_DATE) AS delayed
FROM trans.scm_trade_shipment ts
JOIN trans.scm_po po ON ts.order_id = po.po_id
WHERE ts.supplier_id = $1
AND ts.status = 'shipped'
""", merchant_id)
in_transit = int(row["in_transit"] or 0)
delayed = int(row["delayed"] or 0)
delayed_pct = round((delayed / in_transit) * 100, 2) if in_transit > 0 else 0.0
return {"value": float(in_transit), "unit": "count",
"secondary_values": {"in_transit_count": in_transit, "delayed_count": delayed,
"delayed_percentage": delayed_pct}}
# ---------------------------------------------------------------------------
# wid_invoices_mtd_001
# ---------------------------------------------------------------------------
async def query_invoices_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
row = await conn.fetchrow("""
SELECT
COUNT(*) AS inv_count,
COALESCE(SUM(grand_total_amt), 0) AS total_amt,
COUNT(*) FILTER (
WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month')
AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP)
) AS prior_count,
COALESCE(SUM(grand_total_amt) FILTER (
WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month')
AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP)
), 0) AS prior_amt
FROM trans.scm_invoice
WHERE (buyer_id = $1 OR supplier_id = $1)
AND status NOT IN ('cancelled','draft')
AND created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP)
""", merchant_id)
current = float(row["inv_count"] or 0)
total = float(row["total_amt"] or 0)
if total < 0:
logger.warning("invoices_mtd: negative total clamped",
extra={"event": "kpi_clamp", "raw": total, "merchant_id": merchant_id})
total = 0.0
prior = float(row["prior_count"] or 0)
delta, delta_pct, trend = _compute_delta(current, prior)
return {"value": max(current, 0), "unit": "count", "delta": delta,
"delta_percentage": delta_pct, "trend": trend,
"secondary_values": {"total_invoiced_amount": total}}
# ---------------------------------------------------------------------------
# wid_credit_debit_notes_mtd_001 (Phase 1 stub)
# ---------------------------------------------------------------------------
async def query_credit_debit_notes_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
logger.warning("credit_debit_notes_mtd: no dedicated table yet; returning stub",
extra={"event": "kpi_stub", "widget_id": "wid_credit_debit_notes_mtd_001",
"merchant_id": merchant_id})
return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None,
"trend": "neutral",
"secondary_values": {"credit_note_total": 0.0, "debit_note_total": 0.0,
"net_impact": 0.0, "stub": True}}
# ---------------------------------------------------------------------------
# wid_pos_sales_today_001 (Phase 2 stub)
# ---------------------------------------------------------------------------
async def query_pos_sales_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]:
logger.warning("pos_sales_today: Phase 2 stub",
extra={"event": "kpi_stub", "widget_id": "wid_pos_sales_today_001",
"merchant_id": merchant_id})
return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None,
"trend": "neutral",
"secondary_values": {"transaction_count": 0, "average_ticket": 0.0, "stub": True}}
# ---------------------------------------------------------------------------
# Dispatch table
# ---------------------------------------------------------------------------
QUERY_DISPATCH = {
"wid_open_po_count_001": query_open_po_count,
"wid_po_aging_001": query_po_aging,
"wid_receipts_this_week_001": query_receipts_this_week,
"wid_stock_ins_today_001": query_stock_ins_today,
"wid_low_stock_skus_001": query_low_stock_skus,
"wid_net_stock_value_001": query_net_stock_value,
"wid_adjustments_mtd_001": query_adjustments_mtd,
"wid_stock_take_pending_001": query_stock_take_pending,
"wid_shipments_transit_001": query_shipments_transit,
"wid_invoices_mtd_001": query_invoices_mtd,
"wid_credit_debit_notes_mtd_001": query_credit_debit_notes_mtd,
"wid_pos_sales_today_001": query_pos_sales_today,
}