""" Raw asyncpg SQL queries for each KPI widget. All queries are scoped to merchant_id as the first predicate. Returns a dict matching KPICacheDocument fields. """ from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any from app.core.logging import get_logger logger = get_logger(__name__) def _utc_today_start() -> datetime: now = datetime.now(timezone.utc) return now.replace(hour=0, minute=0, second=0, microsecond=0) def _compute_delta(current: float, prior: float): delta = current - prior delta_pct = round((delta / prior) * 100, 2) if prior != 0 else None trend = "up" if delta > 0 else ("down" if delta < 0 else "neutral") return delta, delta_pct, trend # --------------------------------------------------------------------------- # wid_open_po_count_001 # --------------------------------------------------------------------------- async def query_open_po_count(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" params = [merchant_id] + ([branch_id] if branch_id else []) row = await conn.fetchrow(f""" SELECT COUNT(*) FILTER (WHERE po_date >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, COUNT(*) FILTER (WHERE po_date < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count FROM trans.scm_po WHERE merchant_id = $1 AND status IN ('submitted','approved','dispatched','partially_received') {branch_clause} """, *params) current = float(row["current_count"] or 0) prior = float(row["prior_count"] or 0) delta, delta_pct, trend = _compute_delta(current, prior) return {"value": max(current, 0), "unit": "count", "delta": delta, "delta_percentage": delta_pct, "trend": trend} # --------------------------------------------------------------------------- # wid_po_aging_001 # --------------------------------------------------------------------------- async def query_po_aging(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" params = [merchant_id] + ([branch_id] if branch_id else []) row = await conn.fetchrow(f""" SELECT COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7) AS b0_7, COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14) AS b8_14, COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30) AS b15_30, COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date > 30) AS b30_plus, COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7), 0) AS v0_7, COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14), 0) AS v8_14, COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30), 0) AS v15_30, COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date > 30), 0) AS v30_plus FROM trans.scm_po WHERE merchant_id = $1 AND status IN ('submitted','approved','dispatched','partially_received') {branch_clause} """, *params) b = {k: int(row[k] or 0) for k in ("b0_7", "b8_14", "b15_30", "b30_plus")} total = sum(b.values()) secondary = { "bucket_0_7": {"count": b["b0_7"], "value": float(row["v0_7"] or 0)}, "bucket_8_14": {"count": b["b8_14"], "value": float(row["v8_14"] or 0)}, "bucket_15_30": {"count": b["b15_30"], "value": float(row["v15_30"] or 0)}, "bucket_30_plus":{"count": b["b30_plus"],"value": float(row["v30_plus"] or 0)}, } return {"value": float(total), "unit": "count", "secondary_values": secondary} # --------------------------------------------------------------------------- # wid_receipts_this_week_001 # --------------------------------------------------------------------------- async def query_receipts_this_week(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: branch_clause = "AND wh_location = $2" if branch_id else "" params = [merchant_id] + ([branch_id] if branch_id else []) row = await conn.fetchrow(f""" SELECT COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, COALESCE(SUM(total_qty) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days'), 0) AS total_qty, COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '14 days' AND recv_dt < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count FROM trans.scm_grn WHERE merchant_id = $1 {branch_clause} """, *params) current = float(row["current_count"] or 0) prior = float(row["prior_count"] or 0) delta, delta_pct, trend = _compute_delta(current, prior) return {"value": max(current, 0), "unit": "count", "delta": delta, "delta_percentage": delta_pct, "trend": trend, "secondary_values": {"total_received_qty": float(row["total_qty"] or 0)}} # --------------------------------------------------------------------------- # wid_stock_ins_today_001 (MongoDB Self-GRN — stub returning 0 until cross-service) # --------------------------------------------------------------------------- async def query_stock_ins_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: # Self-GRN lives in MongoDB managed by SCM-ms; analytics-ms returns a stub # until a shared collection or internal API is available. logger.warning("stock_ins_today: MongoDB Self-GRN not accessible from analytics-ms; returning stub", extra={"event": "kpi_stub", "widget_id": "wid_stock_ins_today_001", "merchant_id": merchant_id}) return {"value": 0.0, "unit": "count", "delta": 0.0, "delta_percentage": None, "trend": "neutral", "secondary_values": {"stub": True}} # --------------------------------------------------------------------------- # wid_low_stock_skus_001 # --------------------------------------------------------------------------- async def query_low_stock_skus(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: branch_clause = "AND warehouse_id = $2" if branch_id else "" params = [merchant_id] + ([branch_id] if branch_id else []) row = await conn.fetchrow(f""" SELECT COUNT(DISTINCT sku) FILTER (WHERE qty_available = 0) AS stockout_count, COUNT(DISTINCT sku) FILTER (WHERE qty_available <= 0) AS low_stock_count FROM trans.scm_stock WHERE merchant_id = $1 {branch_clause} """, *params) stockout = int(row["stockout_count"] or 0) low = int(row["low_stock_count"] or 0) logger.warning("low_stock_skus: reorder_point not on scm_stock; using qty_available=0 fallback", extra={"event": "kpi_fallback", "merchant_id": merchant_id}) return {"value": float(low), "unit": "count", "secondary_values": {"stockout_count": stockout, "low_stock_count": low}} # --------------------------------------------------------------------------- # wid_net_stock_value_001 # --------------------------------------------------------------------------- async def query_net_stock_value(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: branch_clause = "AND warehouse_id = $2" if branch_id else "" params = [merchant_id] + ([branch_id] if branch_id else []) row = await conn.fetchrow(f""" SELECT COALESCE(SUM(qty_on_hand * cost_price), 0) AS net_value FROM trans.scm_stock WHERE merchant_id = $1 AND cost_price IS NOT NULL {branch_clause} """, *params) value = float(row["net_value"] or 0) if value < 0: logger.warning("net_stock_value: negative aggregate clamped to 0", extra={"event": "kpi_clamp", "raw": value, "merchant_id": merchant_id}) value = 0.0 return {"value": value, "unit": "INR", "delta": None, "delta_percentage": None, "trend": "neutral"} # --------------------------------------------------------------------------- # wid_adjustments_mtd_001 # --------------------------------------------------------------------------- async def query_adjustments_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: branch_clause = "AND m.warehouse_id = $2" if branch_id else "" params = [merchant_id] + ([branch_id] if branch_id else []) row = await conn.fetchrow(f""" SELECT COUNT(DISTINCT m.adjustment_master_id) AS adj_count, COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'IN'), 0) AS pos_value, COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'OUT'), 0) AS neg_value FROM trans.scm_stock_adjustment_master m JOIN trans.scm_stock_adjustment_details d ON d.adjustment_master_id = m.adjustment_master_id WHERE m.merchant_id = $1 AND m.status IN ('approved','applied') AND m.adjustment_date >= DATE_TRUNC('month', CURRENT_TIMESTAMP) {branch_clause} """, *params) pos = float(row["pos_value"] or 0) neg = float(row["neg_value"] or 0) return {"value": float(row["adj_count"] or 0), "unit": "count", "secondary_values": {"positive_value": pos, "negative_value": neg, "net_impact": pos - neg}} # --------------------------------------------------------------------------- # wid_stock_take_pending_001 # --------------------------------------------------------------------------- async def query_stock_take_pending(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: branch_clause = "AND warehouse_id = $2" if branch_id else "" params = [merchant_id] + ([branch_id] if branch_id else []) row = await conn.fetchrow(f""" SELECT COUNT(*) FILTER (WHERE status IN ('draft','in_progress')) AS pending_count, COUNT(*) FILTER (WHERE status IN ('draft','in_progress') AND stock_take_date < CURRENT_TIMESTAMP) AS overdue_count FROM trans.scm_stock_take_master WHERE merchant_id = $1 {branch_clause} """, *params) pending = int(row["pending_count"] or 0) overdue = int(row["overdue_count"] or 0) return {"value": float(pending), "unit": "count", "secondary_values": {"pending_count": pending, "overdue_count": overdue}} # --------------------------------------------------------------------------- # wid_shipments_transit_001 # --------------------------------------------------------------------------- async def query_shipments_transit(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: row = await conn.fetchrow(""" SELECT COUNT(*) AS in_transit, COUNT(*) FILTER (WHERE po.exp_delivery_dt < CURRENT_DATE) AS delayed FROM trans.scm_trade_shipment ts JOIN trans.scm_po po ON ts.order_id = po.po_id WHERE ts.supplier_id = $1 AND ts.status = 'shipped' """, merchant_id) in_transit = int(row["in_transit"] or 0) delayed = int(row["delayed"] or 0) delayed_pct = round((delayed / in_transit) * 100, 2) if in_transit > 0 else 0.0 return {"value": float(in_transit), "unit": "count", "secondary_values": {"in_transit_count": in_transit, "delayed_count": delayed, "delayed_percentage": delayed_pct}} # --------------------------------------------------------------------------- # wid_invoices_mtd_001 # --------------------------------------------------------------------------- async def query_invoices_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: row = await conn.fetchrow(""" SELECT COUNT(*) AS inv_count, COALESCE(SUM(grand_total_amt), 0) AS total_amt, COUNT(*) FILTER ( WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) ) AS prior_count, COALESCE(SUM(grand_total_amt) FILTER ( WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) ), 0) AS prior_amt FROM trans.scm_invoice WHERE (buyer_id = $1 OR supplier_id = $1) AND status NOT IN ('cancelled','draft') AND created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP) """, merchant_id) current = float(row["inv_count"] or 0) total = float(row["total_amt"] or 0) if total < 0: logger.warning("invoices_mtd: negative total clamped", extra={"event": "kpi_clamp", "raw": total, "merchant_id": merchant_id}) total = 0.0 prior = float(row["prior_count"] or 0) delta, delta_pct, trend = _compute_delta(current, prior) return {"value": max(current, 0), "unit": "count", "delta": delta, "delta_percentage": delta_pct, "trend": trend, "secondary_values": {"total_invoiced_amount": total}} # --------------------------------------------------------------------------- # wid_credit_debit_notes_mtd_001 (Phase 1 stub) # --------------------------------------------------------------------------- async def query_credit_debit_notes_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: logger.warning("credit_debit_notes_mtd: no dedicated table yet; returning stub", extra={"event": "kpi_stub", "widget_id": "wid_credit_debit_notes_mtd_001", "merchant_id": merchant_id}) return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None, "trend": "neutral", "secondary_values": {"credit_note_total": 0.0, "debit_note_total": 0.0, "net_impact": 0.0, "stub": True}} # --------------------------------------------------------------------------- # wid_pos_sales_today_001 (Phase 2 stub) # --------------------------------------------------------------------------- async def query_pos_sales_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: logger.warning("pos_sales_today: Phase 2 stub", extra={"event": "kpi_stub", "widget_id": "wid_pos_sales_today_001", "merchant_id": merchant_id}) return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None, "trend": "neutral", "secondary_values": {"transaction_count": 0, "average_ticket": 0.0, "stub": True}} # --------------------------------------------------------------------------- # Dispatch table # --------------------------------------------------------------------------- QUERY_DISPATCH = { "wid_open_po_count_001": query_open_po_count, "wid_po_aging_001": query_po_aging, "wid_receipts_this_week_001": query_receipts_this_week, "wid_stock_ins_today_001": query_stock_ins_today, "wid_low_stock_skus_001": query_low_stock_skus, "wid_net_stock_value_001": query_net_stock_value, "wid_adjustments_mtd_001": query_adjustments_mtd, "wid_stock_take_pending_001": query_stock_take_pending, "wid_shipments_transit_001": query_shipments_transit, "wid_invoices_mtd_001": query_invoices_mtd, "wid_credit_debit_notes_mtd_001": query_credit_debit_notes_mtd, "wid_pos_sales_today_001": query_pos_sales_today, }