Spaces:
Configuration error
Configuration error
| """ | |
| Raw asyncpg SQL queries for each KPI widget. | |
| All queries are scoped to merchant_id as the first predicate. | |
| Returns a dict matching KPICacheDocument fields. | |
| """ | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional, Dict, Any | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| def _utc_today_start() -> datetime: | |
| now = datetime.now(timezone.utc) | |
| return now.replace(hour=0, minute=0, second=0, microsecond=0) | |
| def _compute_delta(current: float, prior: float): | |
| delta = current - prior | |
| delta_pct = round((delta / prior) * 100, 2) if prior != 0 else None | |
| trend = "up" if delta > 0 else ("down" if delta < 0 else "neutral") | |
| return delta, delta_pct, trend | |
| # --------------------------------------------------------------------------- | |
| # wid_open_po_count_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_open_po_count(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" | |
| params = [merchant_id] + ([branch_id] if branch_id else []) | |
| row = await conn.fetchrow(f""" | |
| SELECT | |
| COUNT(*) FILTER (WHERE po_date >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, | |
| COUNT(*) FILTER (WHERE po_date < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count | |
| FROM trans.scm_po | |
| WHERE merchant_id = $1 | |
| AND status IN ('submitted','approved','dispatched','partially_received') | |
| {branch_clause} | |
| """, *params) | |
| current = float(row["current_count"] or 0) | |
| prior = float(row["prior_count"] or 0) | |
| delta, delta_pct, trend = _compute_delta(current, prior) | |
| return {"value": max(current, 0), "unit": "count", "delta": delta, | |
| "delta_percentage": delta_pct, "trend": trend} | |
| # --------------------------------------------------------------------------- | |
| # wid_po_aging_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_po_aging(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| branch_clause = "AND (buyer_id = $2 OR warehouse_id = $2)" if branch_id else "" | |
| params = [merchant_id] + ([branch_id] if branch_id else []) | |
| row = await conn.fetchrow(f""" | |
| SELECT | |
| COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7) AS b0_7, | |
| COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14) AS b8_14, | |
| COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30) AS b15_30, | |
| COUNT(*) FILTER (WHERE CURRENT_DATE - po_date::date > 30) AS b30_plus, | |
| COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 0 AND 7), 0) AS v0_7, | |
| COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 8 AND 14), 0) AS v8_14, | |
| COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date BETWEEN 15 AND 30), 0) AS v15_30, | |
| COALESCE(SUM(net_amt) FILTER (WHERE CURRENT_DATE - po_date::date > 30), 0) AS v30_plus | |
| FROM trans.scm_po | |
| WHERE merchant_id = $1 | |
| AND status IN ('submitted','approved','dispatched','partially_received') | |
| {branch_clause} | |
| """, *params) | |
| b = {k: int(row[k] or 0) for k in ("b0_7", "b8_14", "b15_30", "b30_plus")} | |
| total = sum(b.values()) | |
| secondary = { | |
| "bucket_0_7": {"count": b["b0_7"], "value": float(row["v0_7"] or 0)}, | |
| "bucket_8_14": {"count": b["b8_14"], "value": float(row["v8_14"] or 0)}, | |
| "bucket_15_30": {"count": b["b15_30"], "value": float(row["v15_30"] or 0)}, | |
| "bucket_30_plus":{"count": b["b30_plus"],"value": float(row["v30_plus"] or 0)}, | |
| } | |
| return {"value": float(total), "unit": "count", "secondary_values": secondary} | |
| # --------------------------------------------------------------------------- | |
| # wid_receipts_this_week_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_receipts_this_week(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| branch_clause = "AND wh_location = $2" if branch_id else "" | |
| params = [merchant_id] + ([branch_id] if branch_id else []) | |
| row = await conn.fetchrow(f""" | |
| SELECT | |
| COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days') AS current_count, | |
| COALESCE(SUM(total_qty) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '7 days'), 0) AS total_qty, | |
| COUNT(*) FILTER (WHERE recv_dt >= CURRENT_TIMESTAMP - INTERVAL '14 days' | |
| AND recv_dt < CURRENT_TIMESTAMP - INTERVAL '7 days') AS prior_count | |
| FROM trans.scm_grn | |
| WHERE merchant_id = $1 | |
| {branch_clause} | |
| """, *params) | |
| current = float(row["current_count"] or 0) | |
| prior = float(row["prior_count"] or 0) | |
| delta, delta_pct, trend = _compute_delta(current, prior) | |
| return {"value": max(current, 0), "unit": "count", "delta": delta, | |
| "delta_percentage": delta_pct, "trend": trend, | |
| "secondary_values": {"total_received_qty": float(row["total_qty"] or 0)}} | |
| # --------------------------------------------------------------------------- | |
| # wid_stock_ins_today_001 (MongoDB Self-GRN — stub returning 0 until cross-service) | |
| # --------------------------------------------------------------------------- | |
| async def query_stock_ins_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| # Self-GRN lives in MongoDB managed by SCM-ms; analytics-ms returns a stub | |
| # until a shared collection or internal API is available. | |
| logger.warning("stock_ins_today: MongoDB Self-GRN not accessible from analytics-ms; returning stub", | |
| extra={"event": "kpi_stub", "widget_id": "wid_stock_ins_today_001", | |
| "merchant_id": merchant_id}) | |
| return {"value": 0.0, "unit": "count", "delta": 0.0, "delta_percentage": None, | |
| "trend": "neutral", "secondary_values": {"stub": True}} | |
| # --------------------------------------------------------------------------- | |
| # wid_low_stock_skus_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_low_stock_skus(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| branch_clause = "AND warehouse_id = $2" if branch_id else "" | |
| params = [merchant_id] + ([branch_id] if branch_id else []) | |
| row = await conn.fetchrow(f""" | |
| SELECT | |
| COUNT(DISTINCT sku) FILTER (WHERE qty_available = 0) AS stockout_count, | |
| COUNT(DISTINCT sku) FILTER (WHERE qty_available <= 0) AS low_stock_count | |
| FROM trans.scm_stock | |
| WHERE merchant_id = $1 | |
| {branch_clause} | |
| """, *params) | |
| stockout = int(row["stockout_count"] or 0) | |
| low = int(row["low_stock_count"] or 0) | |
| logger.warning("low_stock_skus: reorder_point not on scm_stock; using qty_available=0 fallback", | |
| extra={"event": "kpi_fallback", "merchant_id": merchant_id}) | |
| return {"value": float(low), "unit": "count", | |
| "secondary_values": {"stockout_count": stockout, "low_stock_count": low}} | |
| # --------------------------------------------------------------------------- | |
| # wid_net_stock_value_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_net_stock_value(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| branch_clause = "AND warehouse_id = $2" if branch_id else "" | |
| params = [merchant_id] + ([branch_id] if branch_id else []) | |
| row = await conn.fetchrow(f""" | |
| SELECT COALESCE(SUM(qty_on_hand * cost_price), 0) AS net_value | |
| FROM trans.scm_stock | |
| WHERE merchant_id = $1 | |
| AND cost_price IS NOT NULL | |
| {branch_clause} | |
| """, *params) | |
| value = float(row["net_value"] or 0) | |
| if value < 0: | |
| logger.warning("net_stock_value: negative aggregate clamped to 0", | |
| extra={"event": "kpi_clamp", "raw": value, "merchant_id": merchant_id}) | |
| value = 0.0 | |
| return {"value": value, "unit": "INR", "delta": None, "delta_percentage": None, "trend": "neutral"} | |
| # --------------------------------------------------------------------------- | |
| # wid_adjustments_mtd_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_adjustments_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| branch_clause = "AND m.warehouse_id = $2" if branch_id else "" | |
| params = [merchant_id] + ([branch_id] if branch_id else []) | |
| row = await conn.fetchrow(f""" | |
| SELECT | |
| COUNT(DISTINCT m.adjustment_master_id) AS adj_count, | |
| COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'IN'), 0) AS pos_value, | |
| COALESCE(SUM(d.adjustment_value) FILTER (WHERE d.direction = 'OUT'), 0) AS neg_value | |
| FROM trans.scm_stock_adjustment_master m | |
| JOIN trans.scm_stock_adjustment_details d | |
| ON d.adjustment_master_id = m.adjustment_master_id | |
| WHERE m.merchant_id = $1 | |
| AND m.status IN ('approved','applied') | |
| AND m.adjustment_date >= DATE_TRUNC('month', CURRENT_TIMESTAMP) | |
| {branch_clause} | |
| """, *params) | |
| pos = float(row["pos_value"] or 0) | |
| neg = float(row["neg_value"] or 0) | |
| return {"value": float(row["adj_count"] or 0), "unit": "count", | |
| "secondary_values": {"positive_value": pos, "negative_value": neg, | |
| "net_impact": pos - neg}} | |
| # --------------------------------------------------------------------------- | |
| # wid_stock_take_pending_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_stock_take_pending(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| branch_clause = "AND warehouse_id = $2" if branch_id else "" | |
| params = [merchant_id] + ([branch_id] if branch_id else []) | |
| row = await conn.fetchrow(f""" | |
| SELECT | |
| COUNT(*) FILTER (WHERE status IN ('draft','in_progress')) AS pending_count, | |
| COUNT(*) FILTER (WHERE status IN ('draft','in_progress') | |
| AND stock_take_date < CURRENT_TIMESTAMP) AS overdue_count | |
| FROM trans.scm_stock_take_master | |
| WHERE merchant_id = $1 | |
| {branch_clause} | |
| """, *params) | |
| pending = int(row["pending_count"] or 0) | |
| overdue = int(row["overdue_count"] or 0) | |
| return {"value": float(pending), "unit": "count", | |
| "secondary_values": {"pending_count": pending, "overdue_count": overdue}} | |
| # --------------------------------------------------------------------------- | |
| # wid_shipments_transit_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_shipments_transit(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| row = await conn.fetchrow(""" | |
| SELECT | |
| COUNT(*) AS in_transit, | |
| COUNT(*) FILTER (WHERE po.exp_delivery_dt < CURRENT_DATE) AS delayed | |
| FROM trans.scm_trade_shipment ts | |
| JOIN trans.scm_po po ON ts.order_id = po.po_id | |
| WHERE ts.supplier_id = $1 | |
| AND ts.status = 'shipped' | |
| """, merchant_id) | |
| in_transit = int(row["in_transit"] or 0) | |
| delayed = int(row["delayed"] or 0) | |
| delayed_pct = round((delayed / in_transit) * 100, 2) if in_transit > 0 else 0.0 | |
| return {"value": float(in_transit), "unit": "count", | |
| "secondary_values": {"in_transit_count": in_transit, "delayed_count": delayed, | |
| "delayed_percentage": delayed_pct}} | |
| # --------------------------------------------------------------------------- | |
| # wid_invoices_mtd_001 | |
| # --------------------------------------------------------------------------- | |
| async def query_invoices_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| row = await conn.fetchrow(""" | |
| SELECT | |
| COUNT(*) AS inv_count, | |
| COALESCE(SUM(grand_total_amt), 0) AS total_amt, | |
| COUNT(*) FILTER ( | |
| WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') | |
| AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) | |
| ) AS prior_count, | |
| COALESCE(SUM(grand_total_amt) FILTER ( | |
| WHERE created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP - INTERVAL '1 month') | |
| AND created_at < DATE_TRUNC('month', CURRENT_TIMESTAMP) | |
| ), 0) AS prior_amt | |
| FROM trans.scm_invoice | |
| WHERE (buyer_id = $1 OR supplier_id = $1) | |
| AND status NOT IN ('cancelled','draft') | |
| AND created_at >= DATE_TRUNC('month', CURRENT_TIMESTAMP) | |
| """, merchant_id) | |
| current = float(row["inv_count"] or 0) | |
| total = float(row["total_amt"] or 0) | |
| if total < 0: | |
| logger.warning("invoices_mtd: negative total clamped", | |
| extra={"event": "kpi_clamp", "raw": total, "merchant_id": merchant_id}) | |
| total = 0.0 | |
| prior = float(row["prior_count"] or 0) | |
| delta, delta_pct, trend = _compute_delta(current, prior) | |
| return {"value": max(current, 0), "unit": "count", "delta": delta, | |
| "delta_percentage": delta_pct, "trend": trend, | |
| "secondary_values": {"total_invoiced_amount": total}} | |
| # --------------------------------------------------------------------------- | |
| # wid_credit_debit_notes_mtd_001 (Phase 1 stub) | |
| # --------------------------------------------------------------------------- | |
| async def query_credit_debit_notes_mtd(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| logger.warning("credit_debit_notes_mtd: no dedicated table yet; returning stub", | |
| extra={"event": "kpi_stub", "widget_id": "wid_credit_debit_notes_mtd_001", | |
| "merchant_id": merchant_id}) | |
| return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None, | |
| "trend": "neutral", | |
| "secondary_values": {"credit_note_total": 0.0, "debit_note_total": 0.0, | |
| "net_impact": 0.0, "stub": True}} | |
| # --------------------------------------------------------------------------- | |
| # wid_pos_sales_today_001 (Phase 2 stub) | |
| # --------------------------------------------------------------------------- | |
| async def query_pos_sales_today(conn, merchant_id: str, branch_id: Optional[str]) -> Dict[str, Any]: | |
| logger.warning("pos_sales_today: Phase 2 stub", | |
| extra={"event": "kpi_stub", "widget_id": "wid_pos_sales_today_001", | |
| "merchant_id": merchant_id}) | |
| return {"value": 0.0, "unit": "INR", "delta": 0.0, "delta_percentage": None, | |
| "trend": "neutral", | |
| "secondary_values": {"transaction_count": 0, "average_ticket": 0.0, "stub": True}} | |
| # --------------------------------------------------------------------------- | |
| # Dispatch table | |
| # --------------------------------------------------------------------------- | |
| QUERY_DISPATCH = { | |
| "wid_open_po_count_001": query_open_po_count, | |
| "wid_po_aging_001": query_po_aging, | |
| "wid_receipts_this_week_001": query_receipts_this_week, | |
| "wid_stock_ins_today_001": query_stock_ins_today, | |
| "wid_low_stock_skus_001": query_low_stock_skus, | |
| "wid_net_stock_value_001": query_net_stock_value, | |
| "wid_adjustments_mtd_001": query_adjustments_mtd, | |
| "wid_stock_take_pending_001": query_stock_take_pending, | |
| "wid_shipments_transit_001": query_shipments_transit, | |
| "wid_invoices_mtd_001": query_invoices_mtd, | |
| "wid_credit_debit_notes_mtd_001": query_credit_debit_notes_mtd, | |
| "wid_pos_sales_today_001": query_pos_sales_today, | |
| } | |