rairo commited on
Commit
aa088fd
·
verified ·
1 Parent(s): 9325a36

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +799 -113
main.py CHANGED
@@ -388,13 +388,48 @@ def emit_kpi_debug(profile_id: str, stage: str, payload: Dict[str, Any]) -> None
388
  logger.warning(f"Failed to emit KPI debug logs: {e}")
389
 
390
  class IrisReportEngine:
391
- def __init__(self, profile_id: str, transactions_data: List[dict], llm_instance):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
392
  self.profile_id = profile_id
393
  self.llm = llm_instance
 
394
  self.raw = pd.DataFrame(transactions_data)
 
 
 
395
  self.df = self._load_and_prepare_data(self.raw)
396
  self.currency = self._get_primary_currency()
397
 
 
 
398
  def _load_and_prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
399
  if df is None or df.empty:
400
  emit_kpi_debug(self.profile_id, "load", {"status": "empty_input"})
@@ -403,9 +438,10 @@ class IrisReportEngine:
403
  mapping = ColumnResolver.map(df)
404
  emit_kpi_debug(self.profile_id, "column_map", mapping)
405
 
406
- # Coerce numerics
407
- if mapping["amount"] and mapping["amount"] in df:
408
- df["_Amount"] = pd.to_numeric(df[mapping["amount"]], errors="coerce")
 
409
  else:
410
  df["_Amount"] = pd.Series(dtype=float)
411
 
@@ -419,7 +455,7 @@ class IrisReportEngine:
419
  else:
420
  df["_UnitCost"] = 0.0
421
 
422
- # Compose datetime
423
  if mapping["date"] and mapping["date"] in df:
424
  if mapping["time"] and mapping["time"] in df:
425
  dt_series = pd.to_datetime(
@@ -429,7 +465,6 @@ class IrisReportEngine:
429
  else:
430
  dt_series = pd.to_datetime(df[mapping["date"]], errors="coerce")
431
  else:
432
- # try any datetime-like column
433
  dt_series = pd.to_datetime(df.get("datetime"), errors="coerce")
434
 
435
  try:
@@ -446,61 +481,73 @@ class IrisReportEngine:
446
  # Canonical dims
447
  df["_Invoice"] = df[mapping["invoice"]] if mapping["invoice"] and mapping["invoice"] in df else None
448
  df["_Product"] = df[mapping["product"]] if mapping["product"] and mapping["product"] in df else None
449
- df["_Teller"] = df[mapping["teller"]] if mapping["teller"] and mapping["teller"] in df else None
450
  df["_TxnType"] = (df[mapping["txn_type"]].astype(str).str.lower()
451
- if mapping["txn_type"] and mapping["txn_type"] in df else "")
452
-
453
- # Sales filter
454
- if mapping["txn_type"] and mapping["txn_type"] in df:
455
- sales_mask = df["_TxnType"].isin(["sale", "sales", "invoice"])
456
- working = df[sales_mask].copy()
457
- else:
458
- # Fallback: positive amount == sale-like
459
- working = df[df["_Amount"] > 0].copy()
 
 
 
 
460
 
461
  # Derive measures
462
- working["_Revenue"] = working["_Amount"].fillna(0.0)
463
- working["_COGS"] = (working["_UnitCost"] * working["_Units"]).fillna(0.0)
464
  working["_GrossProfit"] = (working["_Revenue"] - working["_COGS"]).fillna(0.0)
465
- working["_Hour"] = working["_datetime"].dt.hour
466
- working["_DayOfWeek"] = working["_datetime"].dt.day_name()
467
-
468
- # Drop zero rows if both revenue and cost are NaN/0 to avoid noise
 
 
 
 
 
 
 
 
 
469
  working = working[(working["_Revenue"].abs() > 0) | (working["_COGS"].abs() > 0)]
470
 
471
  emit_kpi_debug(self.profile_id, "prepared_counts", {
472
  "raw_rows": int(len(self.raw)),
473
  "rows_with_datetime": int(len(df)),
474
  "sale_like_rows": int(len(working)),
 
475
  })
476
-
 
477
  return working
478
 
479
  def _get_primary_currency(self) -> str:
480
  candidates = ["USD", "ZAR", "ZWL", "EUR", "GBP"]
481
  try:
482
- # Currency field
483
  mapping = ColumnResolver.map(self.raw)
484
  if mapping["currency"] and mapping["currency"] in self.raw:
485
  mode_series = self.raw[mapping["currency"]].dropna().astype(str)
486
  if not mode_series.empty:
487
- # pick the most frequent
488
  val = mode_series.mode()
489
  if not val.empty:
490
  return str(val.iloc[0])
491
- # Heuristic by amount formatting (very weak; fallback only)
492
- # We won't infer here to avoid false positives — default to USD.
493
  except Exception:
494
  pass
495
  return "USD"
496
 
497
- def _get_comparison_timeframes(self) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, str]]:
 
 
498
  if self.df.empty:
499
  return self.df, self.df, {}
500
 
501
  now = now_harare()
502
  start_cur, end_cur = week_bounds_from(now)
503
- # previous week
504
  start_prev = start_cur - pd.Timedelta(days=7)
505
  end_prev = start_cur - pd.Timedelta(seconds=1)
506
 
@@ -528,10 +575,9 @@ class IrisReportEngine:
528
  def _headline(self, cur_df: pd.DataFrame, prev_df: pd.DataFrame) -> Dict[str, Any]:
529
  cur_rev = float(cur_df["_Revenue"].sum()) if not cur_df.empty else 0.0
530
  prev_rev = float(prev_df["_Revenue"].sum()) if not prev_df.empty else 0.0
531
- cur_gp = float(cur_df["_GrossProfit"].sum()) if not cur_df.empty else 0.0
532
  prev_gp = float(prev_df["_GrossProfit"].sum()) if not prev_df.empty else 0.0
533
 
534
- # transactions counted by invoice if present, else by rows
535
  if "_Invoice" in cur_df.columns and cur_df["_Invoice"].notna().any():
536
  tx_now = int(cur_df["_Invoice"].nunique())
537
  else:
@@ -556,76 +602,702 @@ class IrisReportEngine:
556
  emit_kpi_debug(self.profile_id, "headline", head)
557
  return head
558
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559
  def get_business_intelligence_briefing(self) -> Dict[str, Any]:
560
- # Numbers only — no LLM here.
561
  if self.df.empty:
562
  emit_kpi_debug(self.profile_id, "briefing", {"status": "no_data"})
563
  return {"Status": "No sales data available to generate a briefing."}
564
 
565
  current_df, previous_df, tfmeta = self._get_comparison_timeframes()
566
-
567
  if current_df.empty:
568
  emit_kpi_debug(self.profile_id, "briefing", {"status": "no_current_period_data", **tfmeta})
569
  return {"Status": f"No sales data for the current period ({tfmeta.get('period_label', 'N/A')}).", "meta": tfmeta}
570
 
571
  headline = self._headline(current_df, previous_df)
572
 
573
- # Basket analysis
574
- if "_Invoice" in current_df.columns and current_df["_Invoice"].notna().any():
575
- baskets = current_df.groupby("_Invoice", dropna=True).agg(
576
- BasketProfit=("_GrossProfit", "sum"),
577
- ItemsPerBasket=("_Units", "sum")
578
- )
579
- avg_prof = float(baskets["BasketProfit"].mean()) if len(baskets) else None
580
- avg_items = float(baskets["ItemsPerBasket"].mean()) if len(baskets) else None
 
 
 
 
 
 
 
 
 
 
581
  else:
582
- baskets = None
583
- avg_prof = None
584
- avg_items = None
585
-
586
- # Product Intel
587
- prod_profit = (current_df.groupby("_Product")["_GrossProfit"].sum()
588
- if "_Product" in current_df.columns else pd.Series(dtype=float))
589
- prod_units = (current_df.groupby("_Product")["_Units"].sum()
590
- if "_Product" in current_df.columns else pd.Series(dtype=float))
591
-
592
- product_intel: Dict[str, Any] = {}
593
- if not prod_profit.empty:
594
- try:
595
- product_intel["Best in Class (Most Profitable)"] = str(prod_profit.idxmax())
596
- except Exception:
597
- pass
598
- if not prod_units.empty:
599
- try:
600
- product_intel["Workhorse (Most Units Sold)"] = str(prod_units.idxmax())
601
- except Exception:
602
- pass
603
- try:
604
- pos_profit = prod_profit[prod_profit > 0]
605
- if not pos_profit.empty:
606
- product_intel["Underperformer (Least Profitable > 0)"] = str(pos_profit.idxmin())
607
- except Exception:
608
- pass
609
-
610
- # Staff & Ops
611
- teller_profit = (current_df.groupby("_Teller")["_GrossProfit"].sum()
612
- if "_Teller" in current_df.columns else pd.Series(dtype=float))
613
- staff_intel: Dict[str, Any] = {}
614
- if not teller_profit.empty:
615
- try:
616
- staff_intel["Top Performing Teller (by Profit)"] = str(teller_profit.idxmax())
617
- except Exception:
618
- pass
619
-
620
- profit_by_hour = (current_df.groupby("_Hour")["_GrossProfit"].sum()
621
- if "_Hour" in current_df.columns else pd.Series(dtype=float))
622
- most_prof_hour = None
623
- if not profit_by_hour.empty:
624
- try:
625
- most_prof_hour = f"{int(profit_by_hour.idxmax())}:00"
626
- except Exception:
627
- most_prof_hour = None
628
 
 
 
 
 
 
 
 
629
  snapshot = {
630
  "Summary Period": tfmeta.get("period_label", "This Week vs. Last Week"),
631
  "Performance Snapshot (vs. Prior Period)": {
@@ -633,46 +1305,60 @@ class IrisReportEngine:
633
  "Gross Profit": f"{headline['gross_profit_fmt']} ({headline['gross_profit_change']})",
634
  "Transactions": f"{headline['transactions_value']} ({headline['transactions_change']})",
635
  },
636
- "Basket Analysis": {
637
- "Average Profit per Basket": (f"{self.currency} {avg_prof:,.2f}" if isinstance(avg_prof, (int, float)) else "N/A"),
638
- "Average Items per Basket": (f"{avg_items:.1f}" if isinstance(avg_items, (int, float)) else "N/A"),
639
- },
640
- "Product Intelligence": product_intel,
641
- "Staff & Operations": {
642
- **staff_intel,
643
- "Most Profitable Hour": most_prof_hour or "N/A",
644
  },
 
 
645
  "meta": {
646
  "timeframes": tfmeta,
 
 
 
 
 
 
 
 
 
 
 
 
647
  "row_counts": {
648
  "input": int(len(self.raw)),
649
  "prepared": int(len(self.df)),
650
  "current_period": int(len(current_df)),
651
  "previous_period": int(len(previous_df)),
652
- }
 
 
 
 
653
  }
654
  }
655
 
656
  emit_kpi_debug(self.profile_id, "briefing_done", snapshot["meta"])
657
  return json_safe(snapshot)
658
 
659
- def synthesize_fallback_response(self, briefing: dict, user_question: str) -> str:
660
- # LLM for narrative ONLY — data already computed deterministically.
661
- fallback_prompt = (
662
- "You are Iris, an expert business data analyst. Answer the user’s question using the business data below.\n"
663
- "If their question is specific (e.g., “sales yesterday”, “top product”), answer directly.\n"
664
- "If the request can’t be answered precisely, provide a helpful business briefing.\n"
665
- "Use clear markdown with short headings and bullets. Keep it concise.\n"
666
- f'User Question: "{user_question}"\n'
667
- f"Business Data: {json.dumps(json_safe(briefing), indent=2, ensure_ascii=False)}"
668
- )
669
- try:
670
- resp = llm.invoke(fallback_prompt)
671
- return resp.content if hasattr(resp, "content") else str(resp)
672
- except Exception as e:
673
- logger.warning(f"LLM fallback narration failed: {e}")
674
- # Last resort: return raw dict as markdown
675
- return "### Business Snapshot\n\n```\n" + json.dumps(json_safe(briefing), indent=2) + "\n```"
676
 
677
  # -----------------------------------------------------------------------------
678
  # /chat — PandasAI first, then deterministic fallback
 
388
  logger.warning(f"Failed to emit KPI debug logs: {e}")
389
 
390
  class IrisReportEngine:
391
+ """
392
+ Backwards-compatible KPI engine:
393
+ - Keeps existing snapshot sections untouched
394
+ - Adds: Basket Analysis, Product Affinity, Temporal Patterns, Customer Value, Product KPIs (expanded),
395
+ Inventory (optional), Branch Analytics (per-branch + cross-branch), Cash reconciliation (optional)
396
+ - Never uses LLM for numbers. LLM only for narration elsewhere.
397
+ """
398
+
399
+ DEFAULT_PARAMS = {
400
+ "top_k": 5,
401
+ "min_revenue_for_margin_pct": 50.0,
402
+ "min_tx_for_margin_pct": 3,
403
+ "rfm_window_days": 365,
404
+ "retention_factor": 1.0,
405
+ "min_support_baskets": 5, # minimum basket count for a pair to be reported
406
+ "min_lift": 1.2,
407
+ "blocked_products": ["Purchase"], # exclude accounting placeholders from product leaderboards/affinity
408
+ "cash_variance_threshold_abs": 10.0,
409
+ "cash_variance_threshold_pct": 0.008, # 0.8%
410
+ }
411
+
412
+ def __init__(
413
+ self,
414
+ profile_id: str,
415
+ transactions_data: List[dict],
416
+ llm_instance,
417
+ stock_feed: Optional[List[Dict[str, Any]]] = None, # optional: [{product, stock_on_hand, reorder_point, lead_time_days, min_order_qty}]
418
+ cash_float_feed: Optional[List[Dict[str, Any]]] = None, # optional: [{branch, date, opening_float, closing_float, drops, petty_cash, declared_cash}]
419
+ params: Optional[Dict[str, Any]] = None,
420
+ ):
421
  self.profile_id = profile_id
422
  self.llm = llm_instance
423
+ self.params = {**self.DEFAULT_PARAMS, **(params or {})}
424
  self.raw = pd.DataFrame(transactions_data)
425
+ self.stock_feed = pd.DataFrame(stock_feed) if stock_feed else pd.DataFrame()
426
+ self.cash_float_feed = pd.DataFrame(cash_float_feed) if cash_float_feed else pd.DataFrame()
427
+
428
  self.df = self._load_and_prepare_data(self.raw)
429
  self.currency = self._get_primary_currency()
430
 
431
+ # ------------------------- load/prepare -------------------------
432
+
433
  def _load_and_prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
434
  if df is None or df.empty:
435
  emit_kpi_debug(self.profile_id, "load", {"status": "empty_input"})
 
438
  mapping = ColumnResolver.map(df)
439
  emit_kpi_debug(self.profile_id, "column_map", mapping)
440
 
441
+ # Numerics
442
+ amt_col = mapping["amount"] or "Settled_Amount" if "Settled_Amount" in df.columns else None
443
+ if amt_col and amt_col in df:
444
+ df["_Amount"] = pd.to_numeric(df[amt_col], errors="coerce")
445
  else:
446
  df["_Amount"] = pd.Series(dtype=float)
447
 
 
455
  else:
456
  df["_UnitCost"] = 0.0
457
 
458
+ # Datetime
459
  if mapping["date"] and mapping["date"] in df:
460
  if mapping["time"] and mapping["time"] in df:
461
  dt_series = pd.to_datetime(
 
465
  else:
466
  dt_series = pd.to_datetime(df[mapping["date"]], errors="coerce")
467
  else:
 
468
  dt_series = pd.to_datetime(df.get("datetime"), errors="coerce")
469
 
470
  try:
 
481
  # Canonical dims
482
  df["_Invoice"] = df[mapping["invoice"]] if mapping["invoice"] and mapping["invoice"] in df else None
483
  df["_Product"] = df[mapping["product"]] if mapping["product"] and mapping["product"] in df else None
484
+ df["_Teller"] = df[mapping["teller"]] if mapping["teller"] and mapping["teller"] in df else None
485
  df["_TxnType"] = (df[mapping["txn_type"]].astype(str).str.lower()
486
+ if mapping["txn_type"] and mapping["txn_type"] in df else df.get("Transaction_Type", "").astype(str).str.lower())
487
+ df["_Branch"] = df.get("Branch")
488
+ df["_Customer"] = df.get("Customer_Reference")
489
+
490
+ # Sales filter: keep explicit sales OR positive amounts
491
+ sales_mask = (
492
+ df["_TxnType"].isin(["sale", "sales", "invoice"]) |
493
+ df.get("Transaction_Type_ID", pd.Series(dtype=float)).isin([21])
494
+ )
495
+ working = df[sales_mask].copy()
496
+ if working["_Amount"].isna().all():
497
+ working = working.copy()
498
+ # Remove clearly non-sale placeholder SKUs from product analytics later using params["blocked_products"]
499
 
500
  # Derive measures
501
+ working["_Revenue"] = working["_Amount"].fillna(0.0)
502
+ working["_COGS"] = (working["_UnitCost"] * working["_Units"]).fillna(0.0)
503
  working["_GrossProfit"] = (working["_Revenue"] - working["_COGS"]).fillna(0.0)
504
+ working["_Hour"] = working["_datetime"].dt.hour
505
+ working["_DOW"] = working["_datetime"].dt.day_name()
506
+ working["_DOW_idx"] = working["_datetime"].dt.dayofweek # 0=Mon .. 6=Sun
507
+
508
+ # Deduplicate exact duplicate sale lines
509
+ before = len(working)
510
+ dedupe_keys = ["Transaction_ID", "_Invoice", "_Product", "_Units", "_Amount", "_datetime"]
511
+ existing_keys = [k for k in dedupe_keys if k in working.columns]
512
+ if existing_keys:
513
+ working = working.drop_duplicates(subset=existing_keys)
514
+ duplicates_dropped = before - len(working)
515
+
516
+ # Drop zero-rows if both revenue and cost are zero to avoid noise
517
  working = working[(working["_Revenue"].abs() > 0) | (working["_COGS"].abs() > 0)]
518
 
519
  emit_kpi_debug(self.profile_id, "prepared_counts", {
520
  "raw_rows": int(len(self.raw)),
521
  "rows_with_datetime": int(len(df)),
522
  "sale_like_rows": int(len(working)),
523
+ "duplicates_dropped": int(duplicates_dropped),
524
  })
525
+ self._prepared_dupes_dropped = int(duplicates_dropped)
526
+ self._non_sale_excluded = int(len(df) - len(working))
527
  return working
528
 
529
  def _get_primary_currency(self) -> str:
530
  candidates = ["USD", "ZAR", "ZWL", "EUR", "GBP"]
531
  try:
 
532
  mapping = ColumnResolver.map(self.raw)
533
  if mapping["currency"] and mapping["currency"] in self.raw:
534
  mode_series = self.raw[mapping["currency"]].dropna().astype(str)
535
  if not mode_series.empty:
 
536
  val = mode_series.mode()
537
  if not val.empty:
538
  return str(val.iloc[0])
 
 
539
  except Exception:
540
  pass
541
  return "USD"
542
 
543
+ # ------------------------- timeframes & headline -------------------------
544
+
545
+ def _get_comparison_timeframes(self) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]:
546
  if self.df.empty:
547
  return self.df, self.df, {}
548
 
549
  now = now_harare()
550
  start_cur, end_cur = week_bounds_from(now)
 
551
  start_prev = start_cur - pd.Timedelta(days=7)
552
  end_prev = start_cur - pd.Timedelta(seconds=1)
553
 
 
575
  def _headline(self, cur_df: pd.DataFrame, prev_df: pd.DataFrame) -> Dict[str, Any]:
576
  cur_rev = float(cur_df["_Revenue"].sum()) if not cur_df.empty else 0.0
577
  prev_rev = float(prev_df["_Revenue"].sum()) if not prev_df.empty else 0.0
578
+ cur_gp = float(cur_df["_GrossProfit"].sum()) if not cur_df.empty else 0.0
579
  prev_gp = float(prev_df["_GrossProfit"].sum()) if not prev_df.empty else 0.0
580
 
 
581
  if "_Invoice" in cur_df.columns and cur_df["_Invoice"].notna().any():
582
  tx_now = int(cur_df["_Invoice"].nunique())
583
  else:
 
602
  emit_kpi_debug(self.profile_id, "headline", head)
603
  return head
604
 
605
+ # ------------------------- core builders -------------------------
606
+
607
+ def _build_product_aggregates(self, cur_df: pd.DataFrame) -> pd.DataFrame:
608
+ if cur_df.empty:
609
+ return pd.DataFrame(columns=[
610
+ "_Product","revenue","units","cogs","gross_profit","margin_pct","avg_selling_price","avg_unit_cost","tx_count"
611
+ ])
612
+
613
+ df = cur_df.copy()
614
+ # Exclude blocked products for leaderboards/affinity, but keep them in totals if needed
615
+ if self.params["blocked_products"]:
616
+ df = df[~df["_Product"].astype(str).str.strip().isin(self.params["blocked_products"])]
617
+
618
+ # Tx count via invoice nunique if available
619
+ if "_Invoice" in df.columns and df["_Invoice"].notna().any():
620
+ g = df.groupby("_Product", dropna=False).agg(
621
+ revenue=("_Revenue","sum"),
622
+ units=("_Units","sum"),
623
+ cogs=("_COGS","sum"),
624
+ gp=("_GrossProfit","sum"),
625
+ tx=(" _Invoice","nunique") # typo trap; fix next line
626
+ )
627
+ # fix groupby with invoice nunique
628
+ if "_Invoice" in df.columns and df["_Invoice"].notna().any():
629
+ g = df.groupby("_Product", dropna=False).agg(
630
+ revenue=("_Revenue","sum"),
631
+ units=("_Units","sum"),
632
+ cogs=("_COGS","sum"),
633
+ gp=("_GrossProfit","sum"),
634
+ tx=("_Invoice","nunique")
635
+ )
636
+ else:
637
+ g = df.groupby("_Product", dropna=False).agg(
638
+ revenue=("_Revenue","sum"),
639
+ units=("_Units","sum"),
640
+ cogs=("_COGS","sum"),
641
+ gp=("_GrossProfit","sum"),
642
+ tx=("_Product","size")
643
+ )
644
+
645
+ g = g.rename(columns={"gp":"gross_profit", "tx":"tx_count"}).reset_index()
646
+
647
+ # Derived ratios
648
+ g["margin_pct"] = np.where(g["revenue"] > 0, g["gross_profit"] / g["revenue"], np.nan)
649
+ g["avg_selling_price"] = np.where(g["units"] > 0, g["revenue"] / g["units"], np.nan)
650
+ g["avg_unit_cost"] = np.where(g["units"] > 0, g["cogs"] / g["units"], np.nan)
651
+
652
+ # velocity (units/day) needs window length
653
+ # Set later when we know the time window length; store raw fields for now
654
+ return g
655
+
656
+ def _build_basket_table(self, cur_df: pd.DataFrame) -> pd.DataFrame:
657
+ if cur_df.empty:
658
+ return pd.DataFrame(columns=["_Invoice","basket_revenue","basket_gp","basket_items","_datetime_max"])
659
+ # per invoice sums
660
+ b = cur_df.groupby("_Invoice", dropna=False).agg(
661
+ basket_revenue=("_Revenue","sum"),
662
+ basket_gp=("_GrossProfit","sum"),
663
+ basket_items=("_Units","sum"),
664
+ _datetime_max=("_datetime","max"),
665
+ ).reset_index()
666
+ return b
667
+
668
+ def _basket_kpis(self, basket_df: pd.DataFrame) -> Dict[str, Any]:
669
+ if basket_df.empty:
670
+ return {
671
+ "avg_items_per_basket": "N/A",
672
+ "avg_gross_profit_per_basket": "N/A",
673
+ "median_basket_value": "N/A",
674
+ "basket_size_distribution": {},
675
+ "low_sample": True
676
+ }
677
+ avg_items = float(basket_df["basket_items"].mean())
678
+ avg_gp = float(basket_df["basket_gp"].mean())
679
+ median_value = float(basket_df["basket_revenue"].median())
680
+ # size histogram
681
+ sizes = basket_df["basket_items"].fillna(0)
682
+ bins = {
683
+ "1": int(((sizes == 1).sum())),
684
+ "2-3": int(((sizes >= 2) & (sizes <= 3)).sum()),
685
+ "4-5": int(((sizes >= 4) & (sizes <= 5)).sum()),
686
+ "6_plus": int((sizes >= 6).sum()),
687
+ }
688
+ return {
689
+ "avg_items_per_basket": round(avg_items, 2),
690
+ "avg_gross_profit_per_basket": round(avg_gp, 2),
691
+ "median_basket_value": round(median_value, 2),
692
+ "basket_size_distribution": bins
693
+ }
694
+
695
+ def _affinity_pairs(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
696
+ # Build unique product sets per invoice, count pairs
697
+ if cur_df.empty or basket_df.empty or "_Product" not in cur_df.columns:
698
+ return {"params": self._affinity_params(), "top_pairs": []}
699
+
700
+ # Per-basket unique product set (exclude null/blocked)
701
+ tmp = cur_df[["_Invoice","_Product"]].dropna()
702
+ if tmp.empty:
703
+ return {"params": self._affinity_params(), "top_pairs": []}
704
+
705
+ blocked = set(self.params.get("blocked_products", []) or [])
706
+ tmp = tmp[~tmp["_Product"].astype(str).str.strip().isin(blocked)]
707
+ if tmp.empty:
708
+ return {"params": self._affinity_params(), "top_pairs": []}
709
+
710
+ products_per_invoice = tmp.groupby("_Invoice")["_Product"].agg(lambda s: sorted(set(map(str, s)))).reset_index()
711
+ total_baskets = int(len(products_per_invoice))
712
+ if total_baskets == 0:
713
+ return {"params": self._affinity_params(), "top_pairs": []}
714
+
715
+ # Limit explosion: optionally cap to top-N frequent products first
716
+ # Count single supports
717
+ from collections import Counter
718
+ single_counter = Counter()
719
+ for prods in products_per_invoice["_Product"]:
720
+ single_counter.update(prods)
721
+
722
+ # Pair counting
723
+ pair_counter = Counter()
724
+ for prods in products_per_invoice["_Product"]:
725
+ if len(prods) < 2:
726
+ continue
727
+ # 2-combinations
728
+ for i in range(len(prods)):
729
+ for j in range(i+1, len(prods)):
730
+ a, b = prods[i], prods[j]
731
+ pair = (a, b) if a <= b else (b, a)
732
+ pair_counter[pair] += 1
733
+
734
+ min_support_baskets = int(self.params["min_support_baskets"])
735
+ min_lift = float(self.params["min_lift"])
736
+ top_k = int(self.params["top_k"])
737
+
738
+ rows = []
739
+ # Average pair revenue across baskets containing both (optional; approximate via filtering once)
740
+ inv_with_products = cur_df.groupby("_Invoice")["_Product"].apply(lambda s: set(map(str, s.dropna())))
741
+
742
+ # Precompute basket revenue by invoice for avg pair revenue
743
+ rev_by_inv = cur_df.groupby("_Invoice")["_Revenue"].sum()
744
+
745
+ for (a, b), ab_count in pair_counter.items():
746
+ if ab_count < min_support_baskets:
747
+ continue
748
+ support_a = single_counter.get(a, 0) / total_baskets
749
+ support_b = single_counter.get(b, 0) / total_baskets
750
+ support_ab = ab_count / total_baskets
751
+ if support_a == 0 or support_b == 0:
752
+ continue
753
+ confidence = support_ab / support_a
754
+ lift = support_ab / (support_a * support_b) if (support_a * support_b) > 0 else np.nan
755
+ if not np.isfinite(lift) or lift < min_lift:
756
+ continue
757
+
758
+ # avg pair revenue over baskets that include both
759
+ inv_mask = inv_with_products.apply(lambda s: (a in s) and (b in s))
760
+ pair_invoices = inv_mask[inv_mask].index
761
+ avg_pair_revenue = float(rev_by_inv.loc[pair_invoices].mean()) if len(pair_invoices) else np.nan
762
+
763
+ rows.append({
764
+ "a": a, "b": b,
765
+ "support_ab": round(float(support_ab), 6),
766
+ "confidence_a_to_b": round(float(confidence), 6),
767
+ "lift": round(float(lift), 6),
768
+ "pair_basket_count": int(ab_count),
769
+ "avg_pair_revenue": round(avg_pair_revenue, 2) if np.isfinite(avg_pair_revenue) else None,
770
+ })
771
+
772
+ rows.sort(key=lambda r: (r["lift"], r["pair_basket_count"], r["support_ab"]), reverse=True)
773
+ emit_kpi_debug(self.profile_id, "affinity_pairs_counts", {
774
+ "total_baskets": total_baskets, "pairs_after_filters": len(rows)
775
+ })
776
+ return {"params": self._affinity_params(), "top_pairs": rows[:top_k]}
777
+
778
+ def _affinity_params(self) -> Dict[str, Any]:
779
+ return {
780
+ "min_support_baskets": int(self.params["min_support_baskets"]),
781
+ "min_lift": float(self.params["min_lift"]),
782
+ "top_k": int(self.params["top_k"]),
783
+ }
784
+
785
+ def _temporal_patterns(self, cur_df: pd.DataFrame) -> Dict[str, Any]:
786
+ if cur_df.empty:
787
+ return {
788
+ "best_hour_by_profit": None,
789
+ "best_day_by_profit": None,
790
+ "hourly_series": [],
791
+ "dow_series": [],
792
+ "profit_heatmap_7x24": []
793
+ }
794
+ # Hourly
795
+ gh = cur_df.groupby("_Hour", dropna=False).agg(
796
+ revenue=("_Revenue","sum"),
797
+ gross_profit=("_GrossProfit","sum")
798
+ ).reset_index()
799
+ best_hour_idx = int(gh.loc[gh["gross_profit"].idxmax(), "_Hour"]) if not gh.empty else None
800
+ best_hour_gp = float(gh["gross_profit"].max()) if not gh.empty else None
801
+
802
+ # DOW
803
+ gd = cur_df.groupby("_DOW", dropna=False).agg(
804
+ revenue=("_Revenue","sum"),
805
+ gross_profit=("_GrossProfit","sum")
806
+ ).reset_index()
807
+ # enforce Mon..Sun order using _DOW_idx
808
+ order_map = cur_df.groupby("_DOW")["_DOW_idx"].max().to_dict()
809
+ gd["__ord"] = gd["_DOW"].map(order_map)
810
+ gd = gd.sort_values("__ord", kind="stable")
811
+ best_day_row = gd.loc[gd["gross_profit"].idxmax()] if not gd.empty else None
812
+ best_day = {"day": str(best_day_row["_DOW"]), "gross_profit": float(best_day_row["gross_profit"])} if best_day_row is not None else None
813
+
814
+ # Heatmap (7x24 by _DOW_idx then _Hour)
815
+ m = cur_df.groupby(["_DOW_idx","_Hour"], dropna=False)["_GrossProfit"].sum().unstack(fill_value=0)
816
+ # ensure full 7x24
817
+ m = m.reindex(index=range(0,7), columns=range(0,24), fill_value=0)
818
+ heatmap = [[float(x) for x in row] for row in m.values.tolist()]
819
+
820
+ hourly_series = gh.rename(columns={"_Hour":"hour"}).to_dict(orient="records")
821
+ dow_series = gd[["_DOW","revenue","gross_profit"]].rename(columns={"_DOW":"day"}).to_dict(orient="records")
822
+
823
+ return {
824
+ "best_hour_by_profit": {"hour": best_hour_idx, "gross_profit": round(best_hour_gp, 2)} if best_hour_idx is not None else None,
825
+ "best_day_by_profit": best_day,
826
+ "hourly_series": [{"hour": int(r["hour"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in hourly_series],
827
+ "dow_series": [{"day": str(r["day"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in dow_series],
828
+ "profit_heatmap_7x24": heatmap
829
+ }
830
+
831
+ def _customer_value(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
832
+ if cur_df.empty or "_Customer" not in cur_df.columns:
833
+ return {
834
+ "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
835
+ "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []},
836
+ "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None}
837
+ }
838
+ df = cur_df.copy()
839
+ # Build per-customer aggregates
840
+ last_date = df.groupby("_Customer")["_datetime"].max()
841
+ orders = (df.dropna(subset=["_Invoice"])
842
+ .groupby("_Customer")["_Invoice"].nunique())
843
+ revenue = df.groupby("_Customer")["_Revenue"].sum()
844
+ gp = df.groupby("_Customer")["_GrossProfit"].sum()
845
+
846
+ # Avg basket value per customer (from their invoices)
847
+ if not basket_df.empty and "_Invoice" in df.columns:
848
+ inv_to_rev = basket_df.set_index("_Invoice")["basket_revenue"]
849
+ cust_invoices = df.dropna(subset=["_Invoice"]).groupby("_Customer")["_Invoice"].agg(lambda x: sorted(set(x)))
850
+ avg_basket_val = {}
851
+ for cust, invs in cust_invoices.items():
852
+ vals = inv_to_rev.reindex(invs).dropna()
853
+ avg_basket_val[cust] = float(vals.mean()) if len(vals) else np.nan
854
+ avg_basket = pd.Series(avg_basket_val)
855
+ else:
856
+ avg_basket = pd.Series(dtype=float)
857
+
858
+ base = now_harare().normalize()
859
+ recency_days = (base - last_date).dt.total_seconds() / (60*60*24)
860
+ rfm = pd.DataFrame({
861
+ "customer": last_date.index.astype(str),
862
+ "last_date": last_date.values,
863
+ "orders": orders.reindex(last_date.index).fillna(0).astype(int).values,
864
+ "revenue": revenue.reindex(last_date.index).fillna(0.0).values,
865
+ "gp": gp.reindex(last_date.index).fillna(0.0).values,
866
+ "recency_days": recency_days.values,
867
+ "avg_basket_value": avg_basket.reindex(last_date.index).values
868
+ }).fillna({"avg_basket_value": np.nan})
869
+
870
+ # Leaderboards
871
+ vip = rfm.sort_values(["gp","orders","revenue"], ascending=[False, False, False]).head(20)
872
+ # At-risk: top quartile gp but recency > 30 days (tunable)
873
+ if len(rfm):
874
+ gp_q3 = rfm["gp"].quantile(0.75)
875
+ at_risk = rfm[(rfm["gp"] >= gp_q3) & (rfm["recency_days"] > 30)].sort_values(["gp","recency_days"], ascending=[False, False]).head(20)
876
+ else:
877
+ at_risk = rfm.head(0)
878
+ # New customers: first seen within current window (approx via last_date inside window and orders==1)
879
+ # (More precise would need a historical first_seen; we infer using current window)
880
+ new_customers = rfm[(rfm["orders"] == 1) & (rfm["recency_days"] <= 7)].sort_values("gp", ascending=False).head(20)
881
+
882
+ out = {
883
+ "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
884
+ "leaderboards": {
885
+ "top_customers_by_gp": rfm_to_list(vip),
886
+ "at_risk": rfm_to_list(at_risk),
887
+ "new_customers": rfm_to_list(new_customers)
888
+ },
889
+ "rfm_summary": {
890
+ "unique_customers": int(rfm["customer"].nunique()),
891
+ "median_recency_days": float(rfm["recency_days"].median()) if len(rfm) else None,
892
+ "median_orders": float(rfm["orders"].median()) if len(rfm) else None,
893
+ "median_gp": float(rfm["gp"].median()) if len(rfm) else None
894
+ }
895
+ }
896
+ emit_kpi_debug(self.profile_id, "rfm_done", {"customers": int(rfm["customer"].nunique())})
897
+ return json_safe(out)
898
+
899
+ # ------------------------- inventory & cash -------------------------
900
+
901
+ def _inventory_block(self, cur_df: pd.DataFrame, product_agg: pd.DataFrame, current_bounds: Tuple[pd.Timestamp, pd.Timestamp]) -> Dict[str, Any]:
902
+ if self.stock_feed.empty:
903
+ return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
904
+
905
+ start_cur, end_cur = current_bounds
906
+ days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
907
+
908
+ # velocity from product_agg
909
+ pa = product_agg.copy()
910
+ if pa.empty:
911
+ return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
912
+
913
+ pa["units_per_day"] = pa["units"] / days
914
+
915
+ # merge stock feed on product
916
+ sf = self.stock_feed.copy()
917
+ # Normalize join keys
918
+ sf["product_key"] = sf.get("product", sf.get("Product", "")).astype(str).str.strip()
919
+ pa["product_key"] = pa["_Product"].astype(str).str.strip()
920
+ merged = pa.merge(sf, on="product_key", how="right", suffixes=("", "_stock"))
921
+
922
+ # If a product exists in stock but didn’t sell in window, units_per_day may be NaN→0
923
+ merged["units_per_day"] = merged["units_per_day"].fillna(0.0)
924
+ merged["stock_on_hand"] = pd.to_numeric(merged.get("stock_on_hand", np.nan), errors="coerce")
925
+ merged["reorder_point"] = pd.to_numeric(merged.get("reorder_point", np.nan), errors="coerce")
926
+ merged["lead_time_days"] = pd.to_numeric(merged.get("lead_time_days", np.nan), errors="coerce")
927
+
928
+ merged["days_of_cover"] = np.where(merged["units_per_day"] > 0, merged["stock_on_hand"] / merged["units_per_day"], np.inf)
929
+
930
+ def status_row(r):
931
+ if pd.isna(r.get("stock_on_hand")):
932
+ return "unknown"
933
+ if r["stock_on_hand"] <= 0:
934
+ return "stockout"
935
+ if pd.notna(r.get("reorder_point")) and r["stock_on_hand"] <= r["reorder_point"]:
936
+ return "low"
937
+ if np.isfinite(r["days_of_cover"]) and pd.notna(r.get("lead_time_days")) and r["days_of_cover"] < r["lead_time_days"]:
938
+ return "stockout_risk"
939
+ if r["units_per_day"] == 0 and (r["stock_on_hand"] or 0) > 0:
940
+ return "dead_stock"
941
+ return "ok"
942
+
943
+ merged["status"] = merged.apply(status_row, axis=1)
944
+
945
+ products_out = []
946
+ low_stock, stockout_risk, dead_stock = [], [], []
947
+ for _, r in merged.iterrows():
948
+ rec = {
949
+ "product": str(r.get("_Product") or r.get("product_key")),
950
+ "stock_on_hand": float(r["stock_on_hand"]) if pd.notna(r["stock_on_hand"]) else None,
951
+ "reorder_point": float(r["reorder_point"]) if pd.notna(r["reorder_point"]) else None,
952
+ "lead_time_days": float(r["lead_time_days"]) if pd.notna(r["lead_time_days"]) else None,
953
+ "days_of_cover": float(r["days_of_cover"]) if np.isfinite(r["days_of_cover"]) else None,
954
+ "daily_sales_velocity": float(r["units_per_day"]),
955
+ "status": str(r["status"])
956
+ }
957
+ products_out.append(rec)
958
+ if rec["status"] == "low":
959
+ low_stock.append(rec["product"])
960
+ elif rec["status"] == "stockout_risk":
961
+ stockout_risk.append(rec["product"])
962
+ elif rec["status"] == "dead_stock":
963
+ dead_stock.append(rec["product"])
964
+
965
+ return {
966
+ "stock_snapshot_asof": now_harare().isoformat(),
967
+ "products": products_out,
968
+ "alerts": {
969
+ "low_stock": sorted(set(low_stock)),
970
+ "stockout_risk": sorted(set(stockout_risk)),
971
+ "dead_stock": sorted(set(dead_stock))
972
+ }
973
+ }
974
+
975
+ def _cash_recon_block(self, cur_df: pd.DataFrame) -> Dict[str, Any]:
976
+ if self.cash_float_feed.empty:
977
+ return {"status": "no_cash_data"}
978
+
979
+ # We expect cash_float_feed rows with branch, date (YYYY-MM-DD), opening_float, closing_float, drops, petty_cash, declared_cash
980
+ cf = self.cash_float_feed.copy()
981
+ out_days = []
982
+ high_var_days = 0
983
+
984
+ # Compute cash sales per branch/date from cur_df
985
+ if cur_df.empty:
986
+ cash_sales = pd.DataFrame(columns=["branch","date","cash_sales"])
987
+ else:
988
+ df = cur_df.copy()
989
+ df["date"] = df["_datetime"].dt.strftime("%Y-%m-%d")
990
+ df["is_cash"] = (df.get("Money_Type","").astype(str).str.lower() == "cash")
991
+ cash_sales = df[df["is_cash"]].groupby(["_Branch","date"])["_Revenue"].sum().reset_index()
992
+ cash_sales = cash_sales.rename(columns={"_Branch":"branch","_Revenue":"cash_sales"})
993
+
994
+ cf["date"] = cf["date"].astype(str).str[:10]
995
+ merged = cf.merge(cash_sales, on=["branch","date"], how="left")
996
+ merged["cash_sales"] = merged["cash_sales"].fillna(0.0)
997
+
998
+ # Expected Cash = Opening + CashSales – Drops – PettyCash – Closing
999
+ for _, r in merged.iterrows():
1000
+ opening = float(r.get("opening_float") or 0.0)
1001
+ closing = float(r.get("closing_float") or 0.0)
1002
+ drops = float(r.get("drops") or 0.0)
1003
+ petty = float(r.get("petty_cash") or 0.0)
1004
+ declared = float(r.get("declared_cash") or 0.0)
1005
+ cash_sales_val = float(r.get("cash_sales") or 0.0)
1006
+
1007
+ expected = opening + cash_sales_val - drops - petty - closing
1008
+ variance = declared - expected
1009
+ variance_pct = (variance / cash_sales_val) if cash_sales_val > 0 else 0.0
1010
+
1011
+ flag = (abs(variance) >= float(self.params["cash_variance_threshold_abs"])) or \
1012
+ (abs(variance_pct) >= float(self.params["cash_variance_threshold_pct"]))
1013
+
1014
+ if flag:
1015
+ high_var_days += 1
1016
+
1017
+ out_days.append({
1018
+ "branch": str(r["branch"]),
1019
+ "date": str(r["date"]),
1020
+ "cash_sales": round(cash_sales_val, 2),
1021
+ "declared_cash": round(declared, 2),
1022
+ "opening_float": round(opening, 2),
1023
+ "closing_float": round(closing, 2),
1024
+ "drops": round(drops, 2),
1025
+ "petty_cash": round(petty, 2),
1026
+ "expected_cash": round(expected, 2),
1027
+ "variance": round(variance, 2),
1028
+ "variance_pct": round(variance_pct, 4),
1029
+ "flag": bool(flag),
1030
+ })
1031
+
1032
+ return {"days": out_days, "flags": {"high_variance_days": int(high_var_days)}}
1033
+
1034
+ # ------------------------- branch analytics -------------------------
1035
+
1036
+ def _per_branch_blocks(self, cur_df: pd.DataFrame, previous_df: pd.DataFrame, current_bounds: Tuple[pd.Timestamp,pd.Timestamp]) -> Dict[str, Any]:
1037
+ if cur_df.empty or "_Branch" not in cur_df.columns:
1038
+ return {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}}
1039
+
1040
+ per_branch = {}
1041
+ branches = sorted(map(str, cur_df["_Branch"].dropna().unique().tolist()))
1042
+ start_cur, end_cur = current_bounds
1043
+ days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
1044
+
1045
+ branch_summary_rows = []
1046
+
1047
+ for br in branches:
1048
+ d = cur_df[cur_df["_Branch"] == br]
1049
+ if d.empty:
1050
+ continue
1051
+
1052
+ # headline-like
1053
+ revenue = float(d["_Revenue"].sum())
1054
+ cogs = float(d["_COGS"].sum())
1055
+ gp = float(d["_GrossProfit"].sum())
1056
+ margin_pct = (gp / revenue) if revenue > 0 else None
1057
+ tx = int(d["_Invoice"].nunique()) if "_Invoice" in d.columns and d["_Invoice"].notna().any() else int(len(d))
1058
+ items = float(d["_Units"].sum())
1059
+
1060
+ # baskets
1061
+ basket_df = self._build_basket_table(d)
1062
+ basket_kpis = self._basket_kpis(basket_df)
1063
+
1064
+ # temporal
1065
+ temporal = self._temporal_patterns(d)
1066
+
1067
+ # product leaderboards
1068
+ pagg = self._build_product_aggregates(d)
1069
+ if not pagg.empty:
1070
+ pagg["units_per_day"] = pagg["units"] / days
1071
+ product_lb = self._product_leaderboards(pagg)
1072
+ else:
1073
+ product_lb = self._empty_product_leaderboards()
1074
+
1075
+ # affinity
1076
+ affinity = self._affinity_pairs(d, basket_df)
1077
+
1078
+ # customers
1079
+ customers = self._customer_value(d, basket_df)
1080
+
1081
+ # cash recon slice
1082
+ cash_recon = self._cash_recon_block(d)
1083
+
1084
+ per_branch[br] = {
1085
+ "kpis": {
1086
+ "revenue": round(revenue, 2),
1087
+ "cogs": round(cogs, 2),
1088
+ "gross_profit": round(gp, 2),
1089
+ "gp_margin_pct": float(round(margin_pct, 4)) if margin_pct is not None else None,
1090
+ "transactions": tx,
1091
+ "items_sold": round(items, 2),
1092
+ "avg_basket_value": basket_kpis.get("median_basket_value"),
1093
+ "avg_items_per_basket": basket_kpis.get("avg_items_per_basket"),
1094
+ "avg_gp_per_basket": basket_kpis.get("avg_gross_profit_per_basket"),
1095
+ },
1096
+ "temporal": temporal,
1097
+ "products": product_lb,
1098
+ "affinity": affinity,
1099
+ "customer_value": customers,
1100
+ "cash_recon": cash_recon,
1101
+ "data_quality": {
1102
+ "duplicates_dropped": self._prepared_dupes_dropped,
1103
+ "non_sale_rows_excluded": self._non_sale_excluded,
1104
+ "currency_mixed": False # set if you add multi-currency detection
1105
+ }
1106
+ }
1107
+
1108
+ branch_summary_rows.append({"branch": br, "revenue": revenue, "gp": gp, "gp_margin_pct": margin_pct or 0.0})
1109
+
1110
+ # cross-branch comparisons
1111
+ cross = {}
1112
+ if branch_summary_rows:
1113
+ bs = pd.DataFrame(branch_summary_rows)
1114
+ cross["rankings"] = {
1115
+ "by_revenue": bs.sort_values("revenue", ascending=False)[["branch","revenue"]].to_dict(orient="records"),
1116
+ "by_gp_margin_pct": bs.sort_values("gp_margin_pct", ascending=False)[["branch","gp_margin_pct"]].to_dict(orient="records"),
1117
+ }
1118
+ cross["spread"] = {
1119
+ "gp_margin_pct_max": float(bs["gp_margin_pct"].max()) if len(bs) else None,
1120
+ "gp_margin_pct_min": float(bs["gp_margin_pct"].min()) if len(bs) else None,
1121
+ "gap_pct_points": float((bs["gp_margin_pct"].max() - bs["gp_margin_pct"].min())) if len(bs) else None,
1122
+ }
1123
+ # revenue share & HHI
1124
+ tot_rev = float(bs["revenue"].sum())
1125
+ shares = []
1126
+ hhi = 0.0
1127
+ for _, r in bs.iterrows():
1128
+ sh = (r["revenue"] / tot_rev) if tot_rev > 0 else 0.0
1129
+ shares.append({"branch": r["branch"], "share": float(round(sh, 6))})
1130
+ hhi += sh*sh
1131
+ cross["concentration"] = {"share_by_branch": shares, "hhi_revenue": float(round(hhi, 6))}
1132
+ # week-over-week deltas per branch (best-effort: compute previous per-branch)
1133
+ if not previous_df.empty:
1134
+ prev_g = previous_df.groupby("_Branch").agg(
1135
+ revenue=("_Revenue","sum"),
1136
+ gp=("_GrossProfit","sum")
1137
+ ).reset_index().rename(columns={"_Branch":"branch"})
1138
+ cur_g = pd.DataFrame(branch_summary_rows)
1139
+ cur_g = cur_g.rename(columns={"branch":"branch"})
1140
+ m = cur_g.merge(prev_g, on="branch", suffixes=("_cur","_prev"), how="left").fillna(0.0)
1141
+ wow_rows = []
1142
+ for _, r in m.iterrows():
1143
+ wow_rows.append({
1144
+ "branch": r["branch"],
1145
+ "revenue_wow": float(((r["revenue_cur"] - r["revenue_prev"]) / r["revenue_prev"])*100) if r["revenue_prev"]>0 else (100.0 if r["revenue_cur"]>0 else 0.0),
1146
+ "gp_wow": float(((r["gp_cur"] - r["gp_prev"]) / r["gp_prev"])*100) if r["gp_prev"]>0 else (100.0 if r["gp_cur"]>0 else 0.0),
1147
+ "avg_basket_wow": None # compute if you persist prev basket median
1148
+ })
1149
+ cross["trend_wow"] = wow_rows
1150
+
1151
+ return {"params": self._branch_params(), "per_branch": per_branch, "cross_branch": cross}
1152
+
1153
+ def _branch_params(self) -> Dict[str, Any]:
1154
+ return {
1155
+ "top_k": int(self.params["top_k"]),
1156
+ "min_support_baskets": int(self.params["min_support_baskets"]),
1157
+ "min_lift": float(self.params["min_lift"]),
1158
+ "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]),
1159
+ "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]),
1160
+ }
1161
+
1162
+ # ------------------------- product leaderboards & concentration -------------------------
1163
+
1164
+ def _product_leaderboards(self, g: pd.DataFrame) -> Dict[str, Any]:
1165
+ top_k = int(self.params["top_k"])
1166
+ # margin % floor
1167
+ g_marginpct = g.copy()
1168
+ g_marginpct = g_marginpct[
1169
+ (g_marginpct["revenue"] >= float(self.params["min_revenue_for_margin_pct"])) &
1170
+ (g_marginpct["tx_count"] >= int(self.params["min_tx_for_margin_pct"]))
1171
+ ]
1172
+
1173
+ def top(df, col, asc=False):
1174
+ if df.empty:
1175
+ return []
1176
+ d = df.sort_values(col, ascending=asc).head(top_k)
1177
+ return [
1178
+ {
1179
+ "product": str(r["_Product"]),
1180
+ "revenue": round(float(r["revenue"]), 2),
1181
+ "units": float(r["units"]),
1182
+ "gross_profit": round(float(r["gross_profit"]), 2),
1183
+ "margin_pct": float(round(r["margin_pct"], 4)) if pd.notna(r["margin_pct"]) else None,
1184
+ "tx_count": int(r["tx_count"]),
1185
+ "avg_selling_price": float(round(r["avg_selling_price"], 4)) if pd.notna(r["avg_selling_price"]) else None,
1186
+ "avg_unit_cost": float(round(r["avg_unit_cost"], 4)) if pd.notna(r["avg_unit_cost"]) else None,
1187
+ "units_per_day": float(round(r.get("units_per_day", np.nan), 4)) if pd.notna(r.get("units_per_day", np.nan)) else None,
1188
+ } for _, r in d.iterrows()
1189
+ ]
1190
+
1191
+ return {
1192
+ "top_by_revenue": top(g, "revenue", asc=False),
1193
+ "top_by_units": top(g, "units", asc=False),
1194
+ "top_by_margin_value": top(g, "gross_profit", asc=False),
1195
+ "top_by_margin_pct": top(g_marginpct, "margin_pct", asc=False),
1196
+ "bottom_by_revenue": top(g, "revenue", asc=True),
1197
+ "loss_makers": top(g[g["gross_profit"] < 0], "gross_profit", asc=True),
1198
+ "by_velocity": top(g.assign(units_per_day=g.get("units_per_day", np.nan)), "units_per_day", asc=False),
1199
+ "by_gp_per_unit": top(g.assign(gp_per_unit=np.where(g["units"]>0, g["gross_profit"]/g["units"], np.nan)), "gp_per_unit", asc=False),
1200
+ }
1201
+
1202
+ def _empty_product_leaderboards(self) -> Dict[str, Any]:
1203
+ return {
1204
+ "top_by_revenue": [],
1205
+ "top_by_units": [],
1206
+ "top_by_margin_value": [],
1207
+ "top_by_margin_pct": [],
1208
+ "bottom_by_revenue": [],
1209
+ "loss_makers": [],
1210
+ "by_velocity": [],
1211
+ "by_gp_per_unit": [],
1212
+ }
1213
+
1214
+ def _concentration_block(self, g: pd.DataFrame) -> Dict[str, Any]:
1215
+ if g.empty:
1216
+ return {
1217
+ "revenue_share_top5": 0.0,
1218
+ "units_share_top5": 0.0,
1219
+ "revenue_pareto_top20pct_share": 0.0,
1220
+ "gini_revenue": 0.0
1221
+ }
1222
+ # shares
1223
+ total_rev = float(g["revenue"].sum())
1224
+ total_units = float(g["units"].sum())
1225
+ rev_sorted = g.sort_values("revenue", ascending=False)["revenue"].values
1226
+ units_sorted = g.sort_values("units", ascending=False)["units"].values
1227
+
1228
+ share_top5_rev = (rev_sorted[:5].sum() / total_rev) if total_rev > 0 else 0.0
1229
+ share_top5_units = (units_sorted[:5].sum() / total_units) if total_units > 0 else 0.0
1230
+
1231
+ # Pareto top 20% products by count
1232
+ n = len(rev_sorted)
1233
+ if n == 0:
1234
+ pareto = 0.0
1235
+ else:
1236
+ k = max(1, int(np.ceil(0.2 * n)))
1237
+ pareto = rev_sorted[:k].sum() / total_rev if total_rev > 0 else 0.0
1238
+
1239
+ # Gini on revenue
1240
+ if total_rev <= 0 or n == 0:
1241
+ gini = 0.0
1242
+ else:
1243
+ # Gini for array x >=0: G = 1 - 2 * sum((n+1-i)*x_i) / (n * sum(x))
1244
+ x = np.sort(rev_sorted) # ascending
1245
+ cum = np.cumsum(x)
1246
+ gini = 1.0 - 2.0 * np.sum(cum) / (n * np.sum(x)) + 1.0 / n
1247
+
1248
+ return {
1249
+ "revenue_share_top5": float(round(share_top5_rev, 6)),
1250
+ "units_share_top5": float(round(share_top5_units, 6)),
1251
+ "revenue_pareto_top20pct_share": float(round(pareto, 6)),
1252
+ "gini_revenue": float(round(gini, 6))
1253
+ }
1254
+
1255
+ # ------------------------- public API -------------------------
1256
+
1257
  def get_business_intelligence_briefing(self) -> Dict[str, Any]:
 
1258
  if self.df.empty:
1259
  emit_kpi_debug(self.profile_id, "briefing", {"status": "no_data"})
1260
  return {"Status": "No sales data available to generate a briefing."}
1261
 
1262
  current_df, previous_df, tfmeta = self._get_comparison_timeframes()
 
1263
  if current_df.empty:
1264
  emit_kpi_debug(self.profile_id, "briefing", {"status": "no_current_period_data", **tfmeta})
1265
  return {"Status": f"No sales data for the current period ({tfmeta.get('period_label', 'N/A')}).", "meta": tfmeta}
1266
 
1267
  headline = self._headline(current_df, previous_df)
1268
 
1269
+ # Basket & affinity
1270
+ basket_df = self._build_basket_table(current_df)
1271
+ basket_kpis = self._basket_kpis(basket_df)
1272
+ affinity = self._affinity_pairs(current_df, basket_df)
1273
+
1274
+ # Temporal
1275
+ temporal = self._temporal_patterns(current_df)
1276
+
1277
+ # Product aggregates + leaderboards + concentration
1278
+ start_cur = pd.Timestamp(tfmeta["current_start"])
1279
+ end_cur = pd.Timestamp(tfmeta["current_end"])
1280
+ days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
1281
+
1282
+ g_products = self._build_product_aggregates(current_df)
1283
+ if not g_products.empty:
1284
+ g_products["units_per_day"] = g_products["units"] / days
1285
+ product_lb = self._product_leaderboards(g_products)
1286
+ concentration = self._concentration_block(g_products)
1287
  else:
1288
+ product_lb = self._empty_product_leaderboards()
1289
+ concentration = self._concentration_block(pd.DataFrame(columns=["revenue","units"]))
1290
+
1291
+ # Customer value (RFM)
1292
+ customer_val = self._customer_value(current_df, basket_df)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1293
 
1294
+ # Inventory (optional)
1295
+ inventory = self._inventory_block(current_df, g_products, (start_cur, end_cur))
1296
+
1297
+ # Branch analytics
1298
+ branch_block = self._per_branch_blocks(current_df, previous_df, (start_cur, end_cur))
1299
+
1300
+ # Old snapshot maintained + new blocks appended
1301
  snapshot = {
1302
  "Summary Period": tfmeta.get("period_label", "This Week vs. Last Week"),
1303
  "Performance Snapshot (vs. Prior Period)": {
 
1305
  "Gross Profit": f"{headline['gross_profit_fmt']} ({headline['gross_profit_change']})",
1306
  "Transactions": f"{headline['transactions_value']} ({headline['transactions_change']})",
1307
  },
1308
+ "Basket Analysis": basket_kpis,
1309
+ "Product Affinity": affinity,
1310
+ "Temporal Patterns": temporal,
1311
+ "Customer Value": customer_val,
1312
+ "Product KPIs": {
1313
+ "leaderboards": product_lb,
1314
+ "concentration": concentration
 
1315
  },
1316
+ "Inventory": inventory,
1317
+ "Branch Analytics": branch_block,
1318
  "meta": {
1319
  "timeframes": tfmeta,
1320
+ "kpi_params": {
1321
+ "top_k": int(self.params["top_k"]),
1322
+ "min_revenue_for_margin_pct": float(self.params["min_revenue_for_margin_pct"]),
1323
+ "min_tx_for_margin_pct": int(self.params["min_tx_for_margin_pct"]),
1324
+ "rfm_window_days": int(self.params["rfm_window_days"]),
1325
+ "retention_factor": float(self.params["retention_factor"]),
1326
+ "min_support_baskets": int(self.params["min_support_baskets"]),
1327
+ "min_lift": float(self.params["min_lift"]),
1328
+ "blocked_products": list(self.params["blocked_products"]),
1329
+ "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]),
1330
+ "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]),
1331
+ },
1332
  "row_counts": {
1333
  "input": int(len(self.raw)),
1334
  "prepared": int(len(self.df)),
1335
  "current_period": int(len(current_df)),
1336
  "previous_period": int(len(previous_df)),
1337
+ },
1338
+ "notes": [
1339
+ "Non-sales transaction types excluded (e.g., Transaction_Type_ID != 21).",
1340
+ f"Duplicates dropped: {getattr(self, '_prepared_dupes_dropped', 0)}",
1341
+ ],
1342
  }
1343
  }
1344
 
1345
  emit_kpi_debug(self.profile_id, "briefing_done", snapshot["meta"])
1346
  return json_safe(snapshot)
1347
 
1348
+ # ------------------------- helpers (outside class) -------------------------
1349
+
1350
+ def rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]:
1351
+ out = []
1352
+ for _, r in df.iterrows():
1353
+ out.append({
1354
+ "customer": str(r["customer"]),
1355
+ "gp": float(round(r["gp"], 2)),
1356
+ "revenue": float(round(r["revenue"], 2)),
1357
+ "orders": int(r["orders"]),
1358
+ "recency_days": float(round(r["recency_days"], 2)) if pd.notna(r["recency_days"]) else None,
1359
+ "avg_basket_value": float(round(r["avg_basket_value"], 2)) if pd.notna(r["avg_basket_value"]) else None
1360
+ })
1361
+ return out
 
 
 
1362
 
1363
  # -----------------------------------------------------------------------------
1364
  # /chat — PandasAI first, then deterministic fallback