rairo commited on
Commit
71935f9
·
verified ·
1 Parent(s): 3262424

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +341 -321
main.py CHANGED
@@ -396,6 +396,24 @@ class IrisReportEngine:
396
  - Never uses LLM for numbers. LLM only for narration elsewhere.
397
  """
398
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
399
  DEFAULT_PARAMS = {
400
  "top_k": 5,
401
  "min_revenue_for_margin_pct": 50.0,
@@ -414,8 +432,8 @@ class IrisReportEngine:
414
  profile_id: str,
415
  transactions_data: List[dict],
416
  llm_instance,
417
- stock_feed: Optional[List[Dict[str, Any]]] = None, # optional: [{product, stock_on_hand, reorder_point, lead_time_days, min_order_qty}]
418
- cash_float_feed: Optional[List[Dict[str, Any]]] = None, # optional: [{branch, date, opening_float, closing_float, drops, petty_cash, declared_cash}]
419
  params: Optional[Dict[str, Any]] = None,
420
  ):
421
  self.profile_id = profile_id
@@ -428,6 +446,26 @@ class IrisReportEngine:
428
  self.df = self._load_and_prepare_data(self.raw)
429
  self.currency = self._get_primary_currency()
430
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
431
  # ------------------------- load/prepare -------------------------
432
 
433
  def _load_and_prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
@@ -439,21 +477,21 @@ class IrisReportEngine:
439
  emit_kpi_debug(self.profile_id, "column_map", mapping)
440
 
441
  # Numerics
442
- amt_col = mapping["amount"] or "Settled_Amount" if "Settled_Amount" in df.columns else None
443
  if amt_col and amt_col in df:
444
- df["_Amount"] = pd.to_numeric(df[amt_col], errors="coerce")
445
  else:
446
- df["_Amount"] = pd.Series(dtype=float)
447
 
448
  if mapping["units"] and mapping["units"] in df:
449
- df["_Units"] = pd.to_numeric(df[mapping["units"]], errors="coerce").fillna(0)
450
  else:
451
- df["_Units"] = 0
452
 
453
  if mapping["unit_cost"] and mapping["unit_cost"] in df:
454
- df["_UnitCost"] = pd.to_numeric(df[mapping["unit_cost"]], errors="coerce").fillna(0.0)
455
  else:
456
- df["_UnitCost"] = 0.0
457
 
458
  # Datetime
459
  if mapping["date"] and mapping["date"] in df:
@@ -475,46 +513,46 @@ class IrisReportEngine:
475
  except Exception:
476
  pass
477
 
478
- df["_datetime"] = dt_series
479
- df = df.dropna(subset=["_datetime"]).copy()
480
 
481
  # Canonical dims
482
- df["_Invoice"] = df[mapping["invoice"]] if mapping["invoice"] and mapping["invoice"] in df else None
483
- df["_Product"] = df[mapping["product"]] if mapping["product"] and mapping["product"] in df else None
484
- df["_Teller"] = df[mapping["teller"]] if mapping["teller"] and mapping["teller"] in df else None
485
- df["_TxnType"] = (df[mapping["txn_type"]].astype(str).str.lower()
486
- if mapping["txn_type"] and mapping["txn_type"] in df else df.get("Transaction_Type", "").astype(str).str.lower())
487
- df["_Branch"] = df.get("Branch")
488
- df["_Customer"] = df.get("Customer_Reference")
489
-
490
- # Sales filter: keep explicit sales OR positive amounts
 
 
491
  sales_mask = (
492
- df["_TxnType"].isin(["sale", "sales", "invoice"]) |
493
- df.get("Transaction_Type_ID", pd.Series(dtype=float)).isin([21])
 
494
  )
495
  working = df[sales_mask].copy()
496
- if working["_Amount"].isna().all():
497
- working = working.copy()
498
- # Remove clearly non-sale placeholder SKUs from product analytics later using params["blocked_products"]
499
 
500
  # Derive measures
501
- working["_Revenue"] = working["_Amount"].fillna(0.0)
502
- working["_COGS"] = (working["_UnitCost"] * working["_Units"]).fillna(0.0)
503
- working["_GrossProfit"] = (working["_Revenue"] - working["_COGS"]).fillna(0.0)
504
- working["_Hour"] = working["_datetime"].dt.hour
505
- working["_DOW"] = working["_datetime"].dt.day_name()
506
- working["_DOW_idx"] = working["_datetime"].dt.dayofweek # 0=Mon .. 6=Sun
507
 
508
  # Deduplicate exact duplicate sale lines
509
  before = len(working)
510
- dedupe_keys = ["Transaction_ID", "_Invoice", "_Product", "_Units", "_Amount", "_datetime"]
511
  existing_keys = [k for k in dedupe_keys if k in working.columns]
512
  if existing_keys:
513
  working = working.drop_duplicates(subset=existing_keys)
514
  duplicates_dropped = before - len(working)
515
 
516
- # Drop zero-rows if both revenue and cost are zero to avoid noise
517
- working = working[(working["_Revenue"].abs() > 0) | (working["_COGS"].abs() > 0)]
518
 
519
  emit_kpi_debug(self.profile_id, "prepared_counts", {
520
  "raw_rows": int(len(self.raw)),
@@ -527,7 +565,6 @@ class IrisReportEngine:
527
  return working
528
 
529
  def _get_primary_currency(self) -> str:
530
- candidates = ["USD", "ZAR", "ZWL", "EUR", "GBP"]
531
  try:
532
  mapping = ColumnResolver.map(self.raw)
533
  if mapping["currency"] and mapping["currency"] in self.raw:
@@ -551,8 +588,8 @@ class IrisReportEngine:
551
  start_prev = start_cur - pd.Timedelta(days=7)
552
  end_prev = start_cur - pd.Timedelta(seconds=1)
553
 
554
- current_df = self.df[(self.df["_datetime"] >= start_cur) & (self.df["_datetime"] <= end_cur)]
555
- previous_df = self.df[(self.df["_datetime"] >= start_prev) & (self.df["_datetime"] <= end_prev)]
556
 
557
  meta = {
558
  "period_label": "This Week vs. Last Week",
@@ -573,17 +610,17 @@ class IrisReportEngine:
573
  return f"{((cur - prev) / prev) * 100:+.1f}%"
574
 
575
  def _headline(self, cur_df: pd.DataFrame, prev_df: pd.DataFrame) -> Dict[str, Any]:
576
- cur_rev = float(cur_df["_Revenue"].sum()) if not cur_df.empty else 0.0
577
- prev_rev = float(prev_df["_Revenue"].sum()) if not prev_df.empty else 0.0
578
- cur_gp = float(cur_df["_GrossProfit"].sum()) if not cur_df.empty else 0.0
579
- prev_gp = float(prev_df["_GrossProfit"].sum()) if not prev_df.empty else 0.0
580
 
581
- if "_Invoice" in cur_df.columns and cur_df["_Invoice"].notna().any():
582
- tx_now = int(cur_df["_Invoice"].nunique())
583
  else:
584
  tx_now = int(len(cur_df))
585
- if "_Invoice" in prev_df.columns and prev_df["_Invoice"].notna().any():
586
- tx_prev = int(prev_df["_Invoice"].nunique())
587
  else:
588
  tx_prev = int(len(prev_df))
589
 
@@ -607,39 +644,31 @@ class IrisReportEngine:
607
  def _build_product_aggregates(self, cur_df: pd.DataFrame) -> pd.DataFrame:
608
  if cur_df.empty:
609
  return pd.DataFrame(columns=[
610
- "_Product","revenue","units","cogs","gross_profit","margin_pct","avg_selling_price","avg_unit_cost","tx_count"
 
611
  ])
612
 
613
  df = cur_df.copy()
614
  # Exclude blocked products for leaderboards/affinity, but keep them in totals if needed
615
  if self.params["blocked_products"]:
616
- df = df[~df["_Product"].astype(str).str.strip().isin(self.params["blocked_products"])]
617
 
618
  # Tx count via invoice nunique if available
619
- if "_Invoice" in df.columns and df["_Invoice"].notna().any():
620
- g = df.groupby("_Product", dropna=False).agg(
621
- revenue=("_Revenue","sum"),
622
- units=("_Units","sum"),
623
- cogs=("_COGS","sum"),
624
- gp=("_GrossProfit","sum"),
625
- tx=(" _Invoice","nunique") # typo trap; fix next line
626
- )
627
- # fix groupby with invoice nunique
628
- if "_Invoice" in df.columns and df["_Invoice"].notna().any():
629
- g = df.groupby("_Product", dropna=False).agg(
630
- revenue=("_Revenue","sum"),
631
- units=("_Units","sum"),
632
- cogs=("_COGS","sum"),
633
- gp=("_GrossProfit","sum"),
634
- tx=("_Invoice","nunique")
635
  )
636
  else:
637
- g = df.groupby("_Product", dropna=False).agg(
638
- revenue=("_Revenue","sum"),
639
- units=("_Units","sum"),
640
- cogs=("_COGS","sum"),
641
- gp=("_GrossProfit","sum"),
642
- tx=("_Product","size")
643
  )
644
 
645
  g = g.rename(columns={"gp":"gross_profit", "tx":"tx_count"}).reset_index()
@@ -649,24 +678,21 @@ class IrisReportEngine:
649
  g["avg_selling_price"] = np.where(g["units"] > 0, g["revenue"] / g["units"], np.nan)
650
  g["avg_unit_cost"] = np.where(g["units"] > 0, g["cogs"] / g["units"], np.nan)
651
 
652
- # velocity (units/day) needs window length
653
- # Set later when we know the time window length; store raw fields for now
654
  return g
655
 
656
  def _build_basket_table(self, cur_df: pd.DataFrame) -> pd.DataFrame:
657
- if cur_df.empty:
658
- return pd.DataFrame(columns=["_Invoice","basket_revenue","basket_gp","basket_items","_datetime_max"])
659
- # per invoice sums
660
- b = cur_df.groupby("_Invoice", dropna=False).agg(
661
- basket_revenue=("_Revenue","sum"),
662
- basket_gp=("_GrossProfit","sum"),
663
- basket_items=("_Units","sum"),
664
- _datetime_max=("_datetime","max"),
665
  ).reset_index()
666
  return b
667
 
668
  def _basket_kpis(self, basket_df: pd.DataFrame) -> Dict[str, Any]:
669
- if basket_df.empty:
670
  return {
671
  "avg_items_per_basket": "N/A",
672
  "avg_gross_profit_per_basket": "N/A",
@@ -677,10 +703,9 @@ class IrisReportEngine:
677
  avg_items = float(basket_df["basket_items"].mean())
678
  avg_gp = float(basket_df["basket_gp"].mean())
679
  median_value = float(basket_df["basket_revenue"].median())
680
- # size histogram
681
  sizes = basket_df["basket_items"].fillna(0)
682
  bins = {
683
- "1": int(((sizes == 1).sum())),
684
  "2-3": int(((sizes >= 2) & (sizes <= 3)).sum()),
685
  "4-5": int(((sizes >= 4) & (sizes <= 5)).sum()),
686
  "6_plus": int((sizes >= 6).sum()),
@@ -694,37 +719,32 @@ class IrisReportEngine:
694
 
695
  def _affinity_pairs(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
696
  # Build unique product sets per invoice, count pairs
697
- if cur_df.empty or basket_df.empty or "_Product" not in cur_df.columns:
698
  return {"params": self._affinity_params(), "top_pairs": []}
699
 
700
- # Per-basket unique product set (exclude null/blocked)
701
- tmp = cur_df[["_Invoice","_Product"]].dropna()
702
  if tmp.empty:
703
  return {"params": self._affinity_params(), "top_pairs": []}
704
 
705
  blocked = set(self.params.get("blocked_products", []) or [])
706
- tmp = tmp[~tmp["_Product"].astype(str).str.strip().isin(blocked)]
707
  if tmp.empty:
708
  return {"params": self._affinity_params(), "top_pairs": []}
709
 
710
- products_per_invoice = tmp.groupby("_Invoice")["_Product"].agg(lambda s: sorted(set(map(str, s)))).reset_index()
711
  total_baskets = int(len(products_per_invoice))
712
  if total_baskets == 0:
713
  return {"params": self._affinity_params(), "top_pairs": []}
714
 
715
- # Limit explosion: optionally cap to top-N frequent products first
716
- # Count single supports
717
  from collections import Counter
718
  single_counter = Counter()
719
- for prods in products_per_invoice["_Product"]:
720
  single_counter.update(prods)
721
 
722
- # Pair counting
723
  pair_counter = Counter()
724
- for prods in products_per_invoice["_Product"]:
725
  if len(prods) < 2:
726
  continue
727
- # 2-combinations
728
  for i in range(len(prods)):
729
  for j in range(i+1, len(prods)):
730
  a, b = prods[i], prods[j]
@@ -736,11 +756,9 @@ class IrisReportEngine:
736
  top_k = int(self.params["top_k"])
737
 
738
  rows = []
739
- # Average pair revenue across baskets containing both (optional; approximate via filtering once)
740
- inv_with_products = cur_df.groupby("_Invoice")["_Product"].apply(lambda s: set(map(str, s.dropna())))
741
-
742
- # Precompute basket revenue by invoice for avg pair revenue
743
- rev_by_inv = cur_df.groupby("_Invoice")["_Revenue"].sum()
744
 
745
  for (a, b), ab_count in pair_counter.items():
746
  if ab_count < min_support_baskets:
@@ -755,7 +773,6 @@ class IrisReportEngine:
755
  if not np.isfinite(lift) or lift < min_lift:
756
  continue
757
 
758
- # avg pair revenue over baskets that include both
759
  inv_mask = inv_with_products.apply(lambda s: (a in s) and (b in s))
760
  pair_invoices = inv_mask[inv_mask].index
761
  avg_pair_revenue = float(rev_by_inv.loc[pair_invoices].mean()) if len(pair_invoices) else np.nan
@@ -791,34 +808,29 @@ class IrisReportEngine:
791
  "dow_series": [],
792
  "profit_heatmap_7x24": []
793
  }
794
- # Hourly
795
- gh = cur_df.groupby("_Hour", dropna=False).agg(
796
- revenue=("_Revenue","sum"),
797
- gross_profit=("_GrossProfit","sum")
798
  ).reset_index()
799
- best_hour_idx = int(gh.loc[gh["gross_profit"].idxmax(), "_Hour"]) if not gh.empty else None
800
  best_hour_gp = float(gh["gross_profit"].max()) if not gh.empty else None
801
 
802
- # DOW
803
- gd = cur_df.groupby("_DOW", dropna=False).agg(
804
- revenue=("_Revenue","sum"),
805
- gross_profit=("_GrossProfit","sum")
806
  ).reset_index()
807
- # enforce Mon..Sun order using _DOW_idx
808
- order_map = cur_df.groupby("_DOW")["_DOW_idx"].max().to_dict()
809
- gd["__ord"] = gd["_DOW"].map(order_map)
810
  gd = gd.sort_values("__ord", kind="stable")
811
  best_day_row = gd.loc[gd["gross_profit"].idxmax()] if not gd.empty else None
812
- best_day = {"day": str(best_day_row["_DOW"]), "gross_profit": float(best_day_row["gross_profit"])} if best_day_row is not None else None
813
 
814
- # Heatmap (7x24 by _DOW_idx then _Hour)
815
- m = cur_df.groupby(["_DOW_idx","_Hour"], dropna=False)["_GrossProfit"].sum().unstack(fill_value=0)
816
- # ensure full 7x24
817
  m = m.reindex(index=range(0,7), columns=range(0,24), fill_value=0)
818
  heatmap = [[float(x) for x in row] for row in m.values.tolist()]
819
 
820
- hourly_series = gh.rename(columns={"_Hour":"hour"}).to_dict(orient="records")
821
- dow_series = gd[["_DOW","revenue","gross_profit"]].rename(columns={"_DOW":"day"}).to_dict(orient="records")
822
 
823
  return {
824
  "best_hour_by_profit": {"hour": best_hour_idx, "gross_profit": round(best_hour_gp, 2)} if best_hour_idx is not None else None,
@@ -829,24 +841,26 @@ class IrisReportEngine:
829
  }
830
 
831
  def _customer_value(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
832
- if cur_df.empty or "_Customer" not in cur_df.columns:
833
  return {
834
  "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
835
  "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []},
836
  "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None}
837
  }
838
  df = cur_df.copy()
839
- # Build per-customer aggregates
840
- last_date = df.groupby("_Customer")["_datetime"].max()
841
- orders = (df.dropna(subset=["_Invoice"])
842
- .groupby("_Customer")["_Invoice"].nunique())
843
- revenue = df.groupby("_Customer")["_Revenue"].sum()
844
- gp = df.groupby("_Customer")["_GrossProfit"].sum()
 
 
845
 
846
  # Avg basket value per customer (from their invoices)
847
- if not basket_df.empty and "_Invoice" in df.columns:
848
- inv_to_rev = basket_df.set_index("_Invoice")["basket_revenue"]
849
- cust_invoices = df.dropna(subset=["_Invoice"]).groupby("_Customer")["_Invoice"].agg(lambda x: sorted(set(x)))
850
  avg_basket_val = {}
851
  for cust, invs in cust_invoices.items():
852
  vals = inv_to_rev.reindex(invs).dropna()
@@ -867,24 +881,20 @@ class IrisReportEngine:
867
  "avg_basket_value": avg_basket.reindex(last_date.index).values
868
  }).fillna({"avg_basket_value": np.nan})
869
 
870
- # Leaderboards
871
  vip = rfm.sort_values(["gp","orders","revenue"], ascending=[False, False, False]).head(20)
872
- # At-risk: top quartile gp but recency > 30 days (tunable)
873
  if len(rfm):
874
  gp_q3 = rfm["gp"].quantile(0.75)
875
  at_risk = rfm[(rfm["gp"] >= gp_q3) & (rfm["recency_days"] > 30)].sort_values(["gp","recency_days"], ascending=[False, False]).head(20)
876
  else:
877
  at_risk = rfm.head(0)
878
- # New customers: first seen within current window (approx via last_date inside window and orders==1)
879
- # (More precise would need a historical first_seen; we infer using current window)
880
  new_customers = rfm[(rfm["orders"] == 1) & (rfm["recency_days"] <= 7)].sort_values("gp", ascending=False).head(20)
881
 
882
  out = {
883
  "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
884
  "leaderboards": {
885
- "top_customers_by_gp": rfm_to_list(vip),
886
- "at_risk": rfm_to_list(at_risk),
887
- "new_customers": rfm_to_list(new_customers)
888
  },
889
  "rfm_summary": {
890
  "unique_customers": int(rfm["customer"].nunique()),
@@ -905,21 +915,17 @@ class IrisReportEngine:
905
  start_cur, end_cur = current_bounds
906
  days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
907
 
908
- # velocity from product_agg
909
- pa = product_agg.copy()
910
  if pa.empty:
911
  return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
912
 
913
  pa["units_per_day"] = pa["units"] / days
914
 
915
- # merge stock feed on product
916
  sf = self.stock_feed.copy()
917
- # Normalize join keys
918
  sf["product_key"] = sf.get("product", sf.get("Product", "")).astype(str).str.strip()
919
- pa["product_key"] = pa["_Product"].astype(str).str.strip()
920
  merged = pa.merge(sf, on="product_key", how="right", suffixes=("", "_stock"))
921
 
922
- # If a product exists in stock but didn’t sell in window, units_per_day may be NaN→0
923
  merged["units_per_day"] = merged["units_per_day"].fillna(0.0)
924
  merged["stock_on_hand"] = pd.to_numeric(merged.get("stock_on_hand", np.nan), errors="coerce")
925
  merged["reorder_point"] = pd.to_numeric(merged.get("reorder_point", np.nan), errors="coerce")
@@ -930,7 +936,7 @@ class IrisReportEngine:
930
  def status_row(r):
931
  if pd.isna(r.get("stock_on_hand")):
932
  return "unknown"
933
- if r["stock_on_hand"] <= 0:
934
  return "stockout"
935
  if pd.notna(r.get("reorder_point")) and r["stock_on_hand"] <= r["reorder_point"]:
936
  return "low"
@@ -942,11 +948,10 @@ class IrisReportEngine:
942
 
943
  merged["status"] = merged.apply(status_row, axis=1)
944
 
945
- products_out = []
946
- low_stock, stockout_risk, dead_stock = [], [], []
947
  for _, r in merged.iterrows():
948
  rec = {
949
- "product": str(r.get("_Product") or r.get("product_key")),
950
  "stock_on_hand": float(r["stock_on_hand"]) if pd.notna(r["stock_on_hand"]) else None,
951
  "reorder_point": float(r["reorder_point"]) if pd.notna(r["reorder_point"]) else None,
952
  "lead_time_days": float(r["lead_time_days"]) if pd.notna(r["lead_time_days"]) else None,
@@ -976,7 +981,6 @@ class IrisReportEngine:
976
  if self.cash_float_feed.empty:
977
  return {"status": "no_cash_data"}
978
 
979
- # We expect cash_float_feed rows with branch, date (YYYY-MM-DD), opening_float, closing_float, drops, petty_cash, declared_cash
980
  cf = self.cash_float_feed.copy()
981
  out_days = []
982
  high_var_days = 0
@@ -986,16 +990,15 @@ class IrisReportEngine:
986
  cash_sales = pd.DataFrame(columns=["branch","date","cash_sales"])
987
  else:
988
  df = cur_df.copy()
989
- df["date"] = df["_datetime"].dt.strftime("%Y-%m-%d")
990
  df["is_cash"] = (df.get("Money_Type","").astype(str).str.lower() == "cash")
991
- cash_sales = df[df["is_cash"]].groupby(["_Branch","date"])["_Revenue"].sum().reset_index()
992
- cash_sales = cash_sales.rename(columns={"_Branch":"branch","_Revenue":"cash_sales"})
993
 
994
  cf["date"] = cf["date"].astype(str).str[:10]
995
  merged = cf.merge(cash_sales, on=["branch","date"], how="left")
996
  merged["cash_sales"] = merged["cash_sales"].fillna(0.0)
997
 
998
- # Expected Cash = Opening + CashSales – Drops – PettyCash – Closing
999
  for _, r in merged.iterrows():
1000
  opening = float(r.get("opening_float") or 0.0)
1001
  closing = float(r.get("closing_float") or 0.0)
@@ -1034,119 +1037,110 @@ class IrisReportEngine:
1034
  # ------------------------- branch analytics -------------------------
1035
 
1036
  def _per_branch_blocks(self, cur_df: pd.DataFrame, previous_df: pd.DataFrame, current_bounds: Tuple[pd.Timestamp,pd.Timestamp]) -> Dict[str, Any]:
1037
- if cur_df.empty or "_Branch" not in cur_df.columns:
1038
  return {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}}
1039
 
1040
  per_branch = {}
1041
- branches = sorted(map(str, cur_df["_Branch"].dropna().unique().tolist()))
1042
  start_cur, end_cur = current_bounds
1043
  days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
1044
 
1045
  branch_summary_rows = []
1046
 
1047
  for br in branches:
1048
- d = cur_df[cur_df["_Branch"] == br]
1049
- if d.empty:
1050
- continue
1051
-
1052
- # headline-like
1053
- revenue = float(d["_Revenue"].sum())
1054
- cogs = float(d["_COGS"].sum())
1055
- gp = float(d["_GrossProfit"].sum())
1056
- margin_pct = (gp / revenue) if revenue > 0 else None
1057
- tx = int(d["_Invoice"].nunique()) if "_Invoice" in d.columns and d["_Invoice"].notna().any() else int(len(d))
1058
- items = float(d["_Units"].sum())
1059
-
1060
- # baskets
1061
- basket_df = self._build_basket_table(d)
1062
- basket_kpis = self._basket_kpis(basket_df)
1063
-
1064
- # temporal
1065
- temporal = self._temporal_patterns(d)
1066
-
1067
- # product leaderboards
1068
- pagg = self._build_product_aggregates(d)
1069
- if not pagg.empty:
1070
- pagg["units_per_day"] = pagg["units"] / days
1071
- product_lb = self._product_leaderboards(pagg)
1072
- else:
1073
- product_lb = self._empty_product_leaderboards()
1074
-
1075
- # affinity
1076
- affinity = self._affinity_pairs(d, basket_df)
1077
-
1078
- # customers
1079
- customers = self._customer_value(d, basket_df)
1080
-
1081
- # cash recon slice
1082
- cash_recon = self._cash_recon_block(d)
1083
-
1084
- per_branch[br] = {
1085
- "kpis": {
1086
- "revenue": round(revenue, 2),
1087
- "cogs": round(cogs, 2),
1088
- "gross_profit": round(gp, 2),
1089
- "gp_margin_pct": float(round(margin_pct, 4)) if margin_pct is not None else None,
1090
- "transactions": tx,
1091
- "items_sold": round(items, 2),
1092
- "avg_basket_value": basket_kpis.get("median_basket_value"),
1093
- "avg_items_per_basket": basket_kpis.get("avg_items_per_basket"),
1094
- "avg_gp_per_basket": basket_kpis.get("avg_gross_profit_per_basket"),
1095
- },
1096
- "temporal": temporal,
1097
- "products": product_lb,
1098
- "affinity": affinity,
1099
- "customer_value": customers,
1100
- "cash_recon": cash_recon,
1101
- "data_quality": {
1102
- "duplicates_dropped": self._prepared_dupes_dropped,
1103
- "non_sale_rows_excluded": self._non_sale_excluded,
1104
- "currency_mixed": False # set if you add multi-currency detection
1105
  }
1106
- }
1107
 
1108
- branch_summary_rows.append({"branch": br, "revenue": revenue, "gp": gp, "gp_margin_pct": margin_pct or 0.0})
 
 
1109
 
1110
- # cross-branch comparisons
1111
  cross = {}
1112
  if branch_summary_rows:
1113
- bs = pd.DataFrame(branch_summary_rows)
1114
- cross["rankings"] = {
1115
- "by_revenue": bs.sort_values("revenue", ascending=False)[["branch","revenue"]].to_dict(orient="records"),
1116
- "by_gp_margin_pct": bs.sort_values("gp_margin_pct", ascending=False)[["branch","gp_margin_pct"]].to_dict(orient="records"),
1117
- }
1118
- cross["spread"] = {
1119
- "gp_margin_pct_max": float(bs["gp_margin_pct"].max()) if len(bs) else None,
1120
- "gp_margin_pct_min": float(bs["gp_margin_pct"].min()) if len(bs) else None,
1121
- "gap_pct_points": float((bs["gp_margin_pct"].max() - bs["gp_margin_pct"].min())) if len(bs) else None,
1122
- }
1123
- # revenue share & HHI
1124
- tot_rev = float(bs["revenue"].sum())
1125
- shares = []
1126
- hhi = 0.0
1127
- for _, r in bs.iterrows():
1128
- sh = (r["revenue"] / tot_rev) if tot_rev > 0 else 0.0
1129
- shares.append({"branch": r["branch"], "share": float(round(sh, 6))})
1130
- hhi += sh*sh
1131
- cross["concentration"] = {"share_by_branch": shares, "hhi_revenue": float(round(hhi, 6))}
1132
- # week-over-week deltas per branch (best-effort: compute previous per-branch)
1133
- if not previous_df.empty:
1134
- prev_g = previous_df.groupby("_Branch").agg(
1135
- revenue=("_Revenue","sum"),
1136
- gp=("_GrossProfit","sum")
1137
- ).reset_index().rename(columns={"_Branch":"branch"})
1138
- cur_g = pd.DataFrame(branch_summary_rows)
1139
- cur_g = cur_g.rename(columns={"branch":"branch"})
1140
- m = cur_g.merge(prev_g, on="branch", suffixes=("_cur","_prev"), how="left").fillna(0.0)
1141
- wow_rows = []
1142
- for _, r in m.iterrows():
1143
- wow_rows.append({
1144
- "branch": r["branch"],
1145
- "revenue_wow": float(((r["revenue_cur"] - r["revenue_prev"]) / r["revenue_prev"])*100) if r["revenue_prev"]>0 else (100.0 if r["revenue_cur"]>0 else 0.0),
1146
- "gp_wow": float(((r["gp_cur"] - r["gp_prev"]) / r["gp_prev"])*100) if r["gp_prev"]>0 else (100.0 if r["gp_cur"]>0 else 0.0),
1147
- "avg_basket_wow": None # compute if you persist prev basket median
1148
- })
1149
- cross["trend_wow"] = wow_rows
1150
 
1151
  return {"params": self._branch_params(), "per_branch": per_branch, "cross_branch": cross}
1152
 
@@ -1163,7 +1157,6 @@ class IrisReportEngine:
1163
 
1164
  def _product_leaderboards(self, g: pd.DataFrame) -> Dict[str, Any]:
1165
  top_k = int(self.params["top_k"])
1166
- # margin % floor
1167
  g_marginpct = g.copy()
1168
  g_marginpct = g_marginpct[
1169
  (g_marginpct["revenue"] >= float(self.params["min_revenue_for_margin_pct"])) &
@@ -1176,7 +1169,7 @@ class IrisReportEngine:
1176
  d = df.sort_values(col, ascending=asc).head(top_k)
1177
  return [
1178
  {
1179
- "product": str(r["_Product"]),
1180
  "revenue": round(float(r["revenue"]), 2),
1181
  "units": float(r["units"]),
1182
  "gross_profit": round(float(r["gross_profit"]), 2),
@@ -1219,7 +1212,6 @@ class IrisReportEngine:
1219
  "revenue_pareto_top20pct_share": 0.0,
1220
  "gini_revenue": 0.0
1221
  }
1222
- # shares
1223
  total_rev = float(g["revenue"].sum())
1224
  total_units = float(g["units"].sum())
1225
  rev_sorted = g.sort_values("revenue", ascending=False)["revenue"].values
@@ -1228,7 +1220,6 @@ class IrisReportEngine:
1228
  share_top5_rev = (rev_sorted[:5].sum() / total_rev) if total_rev > 0 else 0.0
1229
  share_top5_units = (units_sorted[:5].sum() / total_units) if total_units > 0 else 0.0
1230
 
1231
- # Pareto top 20% products by count
1232
  n = len(rev_sorted)
1233
  if n == 0:
1234
  pareto = 0.0
@@ -1236,11 +1227,9 @@ class IrisReportEngine:
1236
  k = max(1, int(np.ceil(0.2 * n)))
1237
  pareto = rev_sorted[:k].sum() / total_rev if total_rev > 0 else 0.0
1238
 
1239
- # Gini on revenue
1240
  if total_rev <= 0 or n == 0:
1241
  gini = 0.0
1242
  else:
1243
- # Gini for array x >=0: G = 1 - 2 * sum((n+1-i)*x_i) / (n * sum(x))
1244
  x = np.sort(rev_sorted) # ascending
1245
  cum = np.cumsum(x)
1246
  gini = 1.0 - 2.0 * np.sum(cum) / (n * np.sum(x)) + 1.0 / n
@@ -1264,82 +1253,119 @@ class IrisReportEngine:
1264
  emit_kpi_debug(self.profile_id, "briefing", {"status": "no_current_period_data", **tfmeta})
1265
  return {"Status": f"No sales data for the current period ({tfmeta.get('period_label', 'N/A')}).", "meta": tfmeta}
1266
 
1267
- headline = self._headline(current_df, previous_df)
 
 
 
 
 
 
 
 
 
 
 
 
 
1268
 
1269
  # Basket & affinity
1270
- basket_df = self._build_basket_table(current_df)
1271
- basket_kpis = self._basket_kpis(basket_df)
1272
- affinity = self._affinity_pairs(current_df, basket_df)
 
 
 
 
 
 
 
 
 
 
 
 
1273
 
1274
  # Temporal
1275
- temporal = self._temporal_patterns(current_df)
 
 
 
 
1276
 
1277
  # Product aggregates + leaderboards + concentration
1278
- start_cur = pd.Timestamp(tfmeta["current_start"])
1279
- end_cur = pd.Timestamp(tfmeta["current_end"])
1280
- days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
 
 
 
 
 
 
 
 
 
 
1281
 
1282
- g_products = self._build_product_aggregates(current_df)
1283
- if not g_products.empty:
1284
- g_products["units_per_day"] = g_products["units"] / days
1285
- product_lb = self._product_leaderboards(g_products)
1286
- concentration = self._concentration_block(g_products)
1287
- else:
1288
- product_lb = self._empty_product_leaderboards()
1289
- concentration = self._concentration_block(pd.DataFrame(columns=["revenue","units"]))
1290
 
1291
  # Customer value (RFM)
1292
- customer_val = self._customer_value(current_df, basket_df)
 
 
 
 
 
 
 
 
 
 
1293
 
1294
  # Inventory (optional)
1295
- inventory = self._inventory_block(current_df, g_products, (start_cur, end_cur))
 
 
 
 
 
1296
 
1297
  # Branch analytics
1298
- branch_block = self._per_branch_blocks(current_df, previous_df, (start_cur, end_cur))
1299
-
1300
- # Old snapshot maintained + new blocks appended
1301
- snapshot = {
1302
- "Summary Period": tfmeta.get("period_label", "This Week vs. Last Week"),
1303
- "Performance Snapshot (vs. Prior Period)": {
1304
- "Total Revenue": f"{headline['total_revenue_fmt']} ({headline['total_revenue_change']})",
1305
- "Gross Profit": f"{headline['gross_profit_fmt']} ({headline['gross_profit_change']})",
1306
- "Transactions": f"{headline['transactions_value']} ({headline['transactions_change']})",
 
 
 
 
 
 
 
 
 
 
 
1307
  },
1308
- "Basket Analysis": basket_kpis,
1309
- "Product Affinity": affinity,
1310
- "Temporal Patterns": temporal,
1311
- "Customer Value": customer_val,
1312
- "Product KPIs": {
1313
- "leaderboards": product_lb,
1314
- "concentration": concentration
1315
  },
1316
- "Inventory": inventory,
1317
- "Branch Analytics": branch_block,
1318
- "meta": {
1319
- "timeframes": tfmeta,
1320
- "kpi_params": {
1321
- "top_k": int(self.params["top_k"]),
1322
- "min_revenue_for_margin_pct": float(self.params["min_revenue_for_margin_pct"]),
1323
- "min_tx_for_margin_pct": int(self.params["min_tx_for_margin_pct"]),
1324
- "rfm_window_days": int(self.params["rfm_window_days"]),
1325
- "retention_factor": float(self.params["retention_factor"]),
1326
- "min_support_baskets": int(self.params["min_support_baskets"]),
1327
- "min_lift": float(self.params["min_lift"]),
1328
- "blocked_products": list(self.params["blocked_products"]),
1329
- "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]),
1330
- "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]),
1331
- },
1332
- "row_counts": {
1333
- "input": int(len(self.raw)),
1334
- "prepared": int(len(self.df)),
1335
- "current_period": int(len(current_df)),
1336
- "previous_period": int(len(previous_df)),
1337
- },
1338
- "notes": [
1339
- "Non-sales transaction types excluded (e.g., Transaction_Type_ID != 21).",
1340
- f"Duplicates dropped: {getattr(self, '_prepared_dupes_dropped', 0)}",
1341
- ],
1342
- }
1343
  }
1344
 
1345
  emit_kpi_debug(self.profile_id, "briefing_done", snapshot["meta"])
@@ -1352,7 +1378,6 @@ class IrisReportEngine:
1352
  Safe for PandasAI exception fallback.
1353
  """
1354
  try:
1355
- tz = TZ
1356
  prompt = (
1357
  "You are Iris, a concise business analyst.\n"
1358
  "IMPORTANT RULES:\n"
@@ -1367,15 +1392,10 @@ class IrisReportEngine:
1367
  "Business Data (authoritative; JSON):\n"
1368
  f"{json.dumps(json_safe(briefing), ensure_ascii=False)}\n"
1369
  )
1370
-
1371
  resp = self.llm.invoke(prompt)
1372
- # ChatGoogleGenerativeAI returns an object with .content
1373
  text = getattr(resp, "content", None) or str(resp)
1374
- # Final safety scrub (remove accidental code fences / tracebacks)
1375
  return sanitize_answer(text)
1376
-
1377
  except Exception as e:
1378
- # Absolute last resort: dump a compact JSON view so the UI shows *something*
1379
  fallback = {
1380
  "note": "Narrative fallback failed; returning raw snapshot.",
1381
  "error": str(e)[:200],
 
396
  - Never uses LLM for numbers. LLM only for narration elsewhere.
397
  """
398
 
399
+ # ---- Canonical column names (single source of truth; no magic strings sprinkled around) ----
400
+ COL_INVOICE = "_Invoice"
401
+ COL_PRODUCT = "_Product"
402
+ COL_TELLER = "_Teller"
403
+ COL_TXNTYPE = "_TxnType"
404
+ COL_BRANCH = "_Branch"
405
+ COL_CUSTOMER = "_Customer"
406
+ COL_DT = "_datetime"
407
+ COL_AMOUNT = "_Amount"
408
+ COL_UNITS = "_Units"
409
+ COL_UNITCOST = "_UnitCost"
410
+ COL_REVENUE = "_Revenue"
411
+ COL_COGS = "_COGS"
412
+ COL_GP = "_GrossProfit"
413
+ COL_HOUR = "_Hour"
414
+ COL_DOW = "_DOW"
415
+ COL_DOWI = "_DOW_idx"
416
+
417
  DEFAULT_PARAMS = {
418
  "top_k": 5,
419
  "min_revenue_for_margin_pct": 50.0,
 
432
  profile_id: str,
433
  transactions_data: List[dict],
434
  llm_instance,
435
+ stock_feed: Optional[List[Dict[str, Any]]] = None,
436
+ cash_float_feed: Optional[List[Dict[str, Any]]] = None,
437
  params: Optional[Dict[str, Any]] = None,
438
  ):
439
  self.profile_id = profile_id
 
446
  self.df = self._load_and_prepare_data(self.raw)
447
  self.currency = self._get_primary_currency()
448
 
449
+ # ------------------------- small helpers -------------------------
450
+ @staticmethod
451
+ def _rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]:
452
+ if df is None or df.empty:
453
+ return []
454
+ out = []
455
+ for _, r in df.iterrows():
456
+ out.append({
457
+ "customer": str(r.get("customer")),
458
+ "orders": int(r.get("orders", 0)),
459
+ "revenue": float(r.get("revenue", 0.0)),
460
+ "gp": float(r.get("gp", 0.0)),
461
+ "recency_days": float(r.get("recency_days", np.nan)) if pd.notna(r.get("recency_days")) else None,
462
+ "avg_basket_value": float(r.get("avg_basket_value", np.nan)) if pd.notna(r.get("avg_basket_value")) else None,
463
+ })
464
+ return out
465
+
466
+ def _has(self, df: pd.DataFrame, col: str) -> bool:
467
+ return isinstance(df, pd.DataFrame) and col in df.columns
468
+
469
  # ------------------------- load/prepare -------------------------
470
 
471
  def _load_and_prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
 
477
  emit_kpi_debug(self.profile_id, "column_map", mapping)
478
 
479
  # Numerics
480
+ amt_col = mapping["amount"] or ("Settled_Amount" if "Settled_Amount" in df.columns else None)
481
  if amt_col and amt_col in df:
482
+ df[self.COL_AMOUNT] = pd.to_numeric(df[amt_col], errors="coerce")
483
  else:
484
+ df[self.COL_AMOUNT] = pd.Series(dtype=float)
485
 
486
  if mapping["units"] and mapping["units"] in df:
487
+ df[self.COL_UNITS] = pd.to_numeric(df[mapping["units"]], errors="coerce").fillna(0)
488
  else:
489
+ df[self.COL_UNITS] = 0
490
 
491
  if mapping["unit_cost"] and mapping["unit_cost"] in df:
492
+ df[self.COL_UNITCOST] = pd.to_numeric(df[mapping["unit_cost"]], errors="coerce").fillna(0.0)
493
  else:
494
+ df[self.COL_UNITCOST] = 0.0
495
 
496
  # Datetime
497
  if mapping["date"] and mapping["date"] in df:
 
513
  except Exception:
514
  pass
515
 
516
+ df[self.COL_DT] = dt_series
517
+ df = df.dropna(subset=[self.COL_DT]).copy()
518
 
519
  # Canonical dims
520
+ df[self.COL_INVOICE] = df[mapping["invoice"]] if mapping["invoice"] and mapping["invoice"] in df else None
521
+ df[self.COL_PRODUCT] = df[mapping["product"]] if mapping["product"] and mapping["product"] in df else None
522
+ df[self.COL_TELLER] = df[mapping["teller"]] if mapping["teller"] and mapping["teller"] in df else None
523
+ df[self.COL_TXNTYPE] = (df[mapping["txn_type"]].astype(str).str.lower()
524
+ if mapping["txn_type"] and mapping["txn_type"] in df
525
+ else df.get("Transaction_Type", "").astype(str).str.lower())
526
+ df[self.COL_BRANCH] = df.get("Branch")
527
+ df[self.COL_CUSTOMER] = df.get("Customer_Reference")
528
+
529
+ # Sales filter: keep explicit sales OR Transaction_Type_ID 21 OR positive amounts
530
+ txid_series = df.get("Transaction_Type_ID")
531
  sales_mask = (
532
+ df[self.COL_TXNTYPE].isin(["sale", "sales", "invoice"]) |
533
+ (pd.Series(False, index=df.index) if txid_series is None else txid_series.isin([21])) |
534
+ (df[self.COL_AMOUNT] > 0)
535
  )
536
  working = df[sales_mask].copy()
 
 
 
537
 
538
  # Derive measures
539
+ working[self.COL_REVENUE] = working[self.COL_AMOUNT].fillna(0.0)
540
+ working[self.COL_COGS] = (working[self.COL_UNITCOST] * working[self.COL_UNITS]).fillna(0.0)
541
+ working[self.COL_GP] = (working[self.COL_REVENUE] - working[self.COL_COGS]).fillna(0.0)
542
+ working[self.COL_HOUR] = working[self.COL_DT].dt.hour
543
+ working[self.COL_DOW] = working[self.COL_DT].dt.day_name()
544
+ working[self.COL_DOWI] = working[self.COL_DT].dt.dayofweek # 0=Mon .. 6=Sun
545
 
546
  # Deduplicate exact duplicate sale lines
547
  before = len(working)
548
+ dedupe_keys = ["Transaction_ID", self.COL_INVOICE, self.COL_PRODUCT, self.COL_UNITS, self.COL_AMOUNT, self.COL_DT]
549
  existing_keys = [k for k in dedupe_keys if k in working.columns]
550
  if existing_keys:
551
  working = working.drop_duplicates(subset=existing_keys)
552
  duplicates_dropped = before - len(working)
553
 
554
+ # Drop zero rows if both revenue and cost are zero
555
+ working = working[(working[self.COL_REVENUE].abs() > 0) | (working[self.COL_COGS].abs() > 0)]
556
 
557
  emit_kpi_debug(self.profile_id, "prepared_counts", {
558
  "raw_rows": int(len(self.raw)),
 
565
  return working
566
 
567
  def _get_primary_currency(self) -> str:
 
568
  try:
569
  mapping = ColumnResolver.map(self.raw)
570
  if mapping["currency"] and mapping["currency"] in self.raw:
 
588
  start_prev = start_cur - pd.Timedelta(days=7)
589
  end_prev = start_cur - pd.Timedelta(seconds=1)
590
 
591
+ current_df = self.df[(self.df[self.COL_DT] >= start_cur) & (self.df[self.COL_DT] <= end_cur)]
592
+ previous_df = self.df[(self.df[self.COL_DT] >= start_prev) & (self.df[self.COL_DT] <= end_prev)]
593
 
594
  meta = {
595
  "period_label": "This Week vs. Last Week",
 
610
  return f"{((cur - prev) / prev) * 100:+.1f}%"
611
 
612
  def _headline(self, cur_df: pd.DataFrame, prev_df: pd.DataFrame) -> Dict[str, Any]:
613
+ cur_rev = float(cur_df[self.COL_REVENUE].sum()) if not cur_df.empty else 0.0
614
+ prev_rev = float(prev_df[self.COL_REVENUE].sum()) if not prev_df.empty else 0.0
615
+ cur_gp = float(cur_df[self.COL_GP].sum()) if not cur_df.empty else 0.0
616
+ prev_gp = float(prev_df[self.COL_GP].sum()) if not prev_df.empty else 0.0
617
 
618
+ if self._has(cur_df, self.COL_INVOICE) and cur_df[self.COL_INVOICE].notna().any():
619
+ tx_now = int(cur_df[self.COL_INVOICE].nunique())
620
  else:
621
  tx_now = int(len(cur_df))
622
+ if self._has(prev_df, self.COL_INVOICE) and prev_df[self.COL_INVOICE].notna().any():
623
+ tx_prev = int(prev_df[self.COL_INVOICE].nunique())
624
  else:
625
  tx_prev = int(len(prev_df))
626
 
 
644
  def _build_product_aggregates(self, cur_df: pd.DataFrame) -> pd.DataFrame:
645
  if cur_df.empty:
646
  return pd.DataFrame(columns=[
647
+ self.COL_PRODUCT,"revenue","units","cogs","gross_profit","margin_pct",
648
+ "avg_selling_price","avg_unit_cost","tx_count"
649
  ])
650
 
651
  df = cur_df.copy()
652
  # Exclude blocked products for leaderboards/affinity, but keep them in totals if needed
653
  if self.params["blocked_products"]:
654
+ df = df[~df[self.COL_PRODUCT].astype(str).str.strip().isin(self.params["blocked_products"])]
655
 
656
  # Tx count via invoice nunique if available
657
+ if self._has(df, self.COL_INVOICE) and df[self.COL_INVOICE].notna().any():
658
+ g = df.groupby(self.COL_PRODUCT, dropna=False).agg(
659
+ revenue=(self.COL_REVENUE,"sum"),
660
+ units=(self.COL_UNITS,"sum"),
661
+ cogs=(self.COL_COGS,"sum"),
662
+ gp=(self.COL_GP,"sum"),
663
+ tx=(self.COL_INVOICE,"nunique")
 
 
 
 
 
 
 
 
 
664
  )
665
  else:
666
+ g = df.groupby(self.COL_PRODUCT, dropna=False).agg(
667
+ revenue=(self.COL_REVENUE,"sum"),
668
+ units=(self.COL_UNITS,"sum"),
669
+ cogs=(self.COL_COGS,"sum"),
670
+ gp=(self.COL_GP,"sum"),
671
+ tx=(self.COL_PRODUCT,"size")
672
  )
673
 
674
  g = g.rename(columns={"gp":"gross_profit", "tx":"tx_count"}).reset_index()
 
678
  g["avg_selling_price"] = np.where(g["units"] > 0, g["revenue"] / g["units"], np.nan)
679
  g["avg_unit_cost"] = np.where(g["units"] > 0, g["cogs"] / g["units"], np.nan)
680
 
 
 
681
  return g
682
 
683
  def _build_basket_table(self, cur_df: pd.DataFrame) -> pd.DataFrame:
684
+ if cur_df.empty or not self._has(cur_df, self.COL_INVOICE):
685
+ return pd.DataFrame(columns=[self.COL_INVOICE,"basket_revenue","basket_gp","basket_items","_datetime_max"])
686
+ b = cur_df.groupby(self.COL_INVOICE, dropna=False).agg(
687
+ basket_revenue=(self.COL_REVENUE,"sum"),
688
+ basket_gp=(self.COL_GP,"sum"),
689
+ basket_items=(self.COL_UNITS,"sum"),
690
+ _datetime_max=(self.COL_DT,"max"),
 
691
  ).reset_index()
692
  return b
693
 
694
  def _basket_kpis(self, basket_df: pd.DataFrame) -> Dict[str, Any]:
695
+ if basket_df is None or basket_df.empty:
696
  return {
697
  "avg_items_per_basket": "N/A",
698
  "avg_gross_profit_per_basket": "N/A",
 
703
  avg_items = float(basket_df["basket_items"].mean())
704
  avg_gp = float(basket_df["basket_gp"].mean())
705
  median_value = float(basket_df["basket_revenue"].median())
 
706
  sizes = basket_df["basket_items"].fillna(0)
707
  bins = {
708
+ "1": int((sizes == 1).sum()),
709
  "2-3": int(((sizes >= 2) & (sizes <= 3)).sum()),
710
  "4-5": int(((sizes >= 4) & (sizes <= 5)).sum()),
711
  "6_plus": int((sizes >= 6).sum()),
 
719
 
720
  def _affinity_pairs(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
721
  # Build unique product sets per invoice, count pairs
722
+ if cur_df.empty or basket_df.empty or not self._has(cur_df, self.COL_PRODUCT) or not self._has(cur_df, self.COL_INVOICE):
723
  return {"params": self._affinity_params(), "top_pairs": []}
724
 
725
+ tmp = cur_df[[self.COL_INVOICE, self.COL_PRODUCT]].dropna()
 
726
  if tmp.empty:
727
  return {"params": self._affinity_params(), "top_pairs": []}
728
 
729
  blocked = set(self.params.get("blocked_products", []) or [])
730
+ tmp = tmp[~tmp[self.COL_PRODUCT].astype(str).str.strip().isin(blocked)]
731
  if tmp.empty:
732
  return {"params": self._affinity_params(), "top_pairs": []}
733
 
734
+ products_per_invoice = tmp.groupby(self.COL_INVOICE)[self.COL_PRODUCT].agg(lambda s: sorted(set(map(str, s)))).reset_index()
735
  total_baskets = int(len(products_per_invoice))
736
  if total_baskets == 0:
737
  return {"params": self._affinity_params(), "top_pairs": []}
738
 
 
 
739
  from collections import Counter
740
  single_counter = Counter()
741
+ for prods in products_per_invoice[self.COL_PRODUCT]:
742
  single_counter.update(prods)
743
 
 
744
  pair_counter = Counter()
745
+ for prods in products_per_invoice[self.COL_PRODUCT]:
746
  if len(prods) < 2:
747
  continue
 
748
  for i in range(len(prods)):
749
  for j in range(i+1, len(prods)):
750
  a, b = prods[i], prods[j]
 
756
  top_k = int(self.params["top_k"])
757
 
758
  rows = []
759
+ # Average pair revenue across baskets containing both (optional; approximate)
760
+ inv_with_products = cur_df.groupby(self.COL_INVOICE)[self.COL_PRODUCT].apply(lambda s: set(map(str, s.dropna())))
761
+ rev_by_inv = cur_df.groupby(self.COL_INVOICE)[self.COL_REVENUE].sum()
 
 
762
 
763
  for (a, b), ab_count in pair_counter.items():
764
  if ab_count < min_support_baskets:
 
773
  if not np.isfinite(lift) or lift < min_lift:
774
  continue
775
 
 
776
  inv_mask = inv_with_products.apply(lambda s: (a in s) and (b in s))
777
  pair_invoices = inv_mask[inv_mask].index
778
  avg_pair_revenue = float(rev_by_inv.loc[pair_invoices].mean()) if len(pair_invoices) else np.nan
 
808
  "dow_series": [],
809
  "profit_heatmap_7x24": []
810
  }
811
+ gh = cur_df.groupby(self.COL_HOUR, dropna=False).agg(
812
+ revenue=(self.COL_REVENUE,"sum"),
813
+ gross_profit=(self.COL_GP,"sum")
 
814
  ).reset_index()
815
+ best_hour_idx = int(gh.loc[gh["gross_profit"].idxmax(), self.COL_HOUR]) if not gh.empty else None
816
  best_hour_gp = float(gh["gross_profit"].max()) if not gh.empty else None
817
 
818
+ gd = cur_df.groupby(self.COL_DOW, dropna=False).agg(
819
+ revenue=(self.COL_REVENUE,"sum"),
820
+ gross_profit=(self.COL_GP,"sum")
 
821
  ).reset_index()
822
+ order_map = cur_df.groupby(self.COL_DOW)[self.COL_DOWI].max().to_dict()
823
+ gd["__ord"] = gd[self.COL_DOW].map(order_map)
 
824
  gd = gd.sort_values("__ord", kind="stable")
825
  best_day_row = gd.loc[gd["gross_profit"].idxmax()] if not gd.empty else None
826
+ best_day = {"day": str(best_day_row[self.COL_DOW]), "gross_profit": float(best_day_row["gross_profit"])} if best_day_row is not None else None
827
 
828
+ m = cur_df.groupby([self.COL_DOWI, self.COL_HOUR], dropna=False)[self.COL_GP].sum().unstack(fill_value=0)
 
 
829
  m = m.reindex(index=range(0,7), columns=range(0,24), fill_value=0)
830
  heatmap = [[float(x) for x in row] for row in m.values.tolist()]
831
 
832
+ hourly_series = gh.rename(columns={self.COL_HOUR:"hour"}).to_dict(orient="records")
833
+ dow_series = gd[[self.COL_DOW,"revenue","gross_profit"]].rename(columns={self.COL_DOW:"day"}).to_dict(orient="records")
834
 
835
  return {
836
  "best_hour_by_profit": {"hour": best_hour_idx, "gross_profit": round(best_hour_gp, 2)} if best_hour_idx is not None else None,
 
841
  }
842
 
843
  def _customer_value(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
844
+ if cur_df.empty or not self._has(cur_df, self.COL_CUSTOMER):
845
  return {
846
  "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
847
  "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []},
848
  "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None}
849
  }
850
  df = cur_df.copy()
851
+
852
+ last_date = df.groupby(self.COL_CUSTOMER)[self.COL_DT].max()
853
+ if self._has(df, self.COL_INVOICE):
854
+ orders = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].nunique()
855
+ else:
856
+ orders = df.groupby(self.COL_CUSTOMER).size()
857
+ revenue = df.groupby(self.COL_CUSTOMER)[self.COL_REVENUE].sum()
858
+ gp = df.groupby(self.COL_CUSTOMER)[self.COL_GP].sum()
859
 
860
  # Avg basket value per customer (from their invoices)
861
+ if not basket_df.empty and self._has(df, self.COL_INVOICE):
862
+ inv_to_rev = basket_df.set_index(self.COL_INVOICE)["basket_revenue"]
863
+ cust_invoices = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].agg(lambda x: sorted(set(x)))
864
  avg_basket_val = {}
865
  for cust, invs in cust_invoices.items():
866
  vals = inv_to_rev.reindex(invs).dropna()
 
881
  "avg_basket_value": avg_basket.reindex(last_date.index).values
882
  }).fillna({"avg_basket_value": np.nan})
883
 
 
884
  vip = rfm.sort_values(["gp","orders","revenue"], ascending=[False, False, False]).head(20)
 
885
  if len(rfm):
886
  gp_q3 = rfm["gp"].quantile(0.75)
887
  at_risk = rfm[(rfm["gp"] >= gp_q3) & (rfm["recency_days"] > 30)].sort_values(["gp","recency_days"], ascending=[False, False]).head(20)
888
  else:
889
  at_risk = rfm.head(0)
 
 
890
  new_customers = rfm[(rfm["orders"] == 1) & (rfm["recency_days"] <= 7)].sort_values("gp", ascending=False).head(20)
891
 
892
  out = {
893
  "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
894
  "leaderboards": {
895
+ "top_customers_by_gp": self._rfm_to_list(vip),
896
+ "at_risk": self._rfm_to_list(at_risk),
897
+ "new_customers": self._rfm_to_list(new_customers)
898
  },
899
  "rfm_summary": {
900
  "unique_customers": int(rfm["customer"].nunique()),
 
915
  start_cur, end_cur = current_bounds
916
  days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
917
 
918
+ pa = (product_agg or pd.DataFrame()).copy()
 
919
  if pa.empty:
920
  return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
921
 
922
  pa["units_per_day"] = pa["units"] / days
923
 
 
924
  sf = self.stock_feed.copy()
 
925
  sf["product_key"] = sf.get("product", sf.get("Product", "")).astype(str).str.strip()
926
+ pa["product_key"] = pa[self.COL_PRODUCT].astype(str).str.strip()
927
  merged = pa.merge(sf, on="product_key", how="right", suffixes=("", "_stock"))
928
 
 
929
  merged["units_per_day"] = merged["units_per_day"].fillna(0.0)
930
  merged["stock_on_hand"] = pd.to_numeric(merged.get("stock_on_hand", np.nan), errors="coerce")
931
  merged["reorder_point"] = pd.to_numeric(merged.get("reorder_point", np.nan), errors="coerce")
 
936
  def status_row(r):
937
  if pd.isna(r.get("stock_on_hand")):
938
  return "unknown"
939
+ if (r["stock_on_hand"] or 0) <= 0:
940
  return "stockout"
941
  if pd.notna(r.get("reorder_point")) and r["stock_on_hand"] <= r["reorder_point"]:
942
  return "low"
 
948
 
949
  merged["status"] = merged.apply(status_row, axis=1)
950
 
951
+ products_out, low_stock, stockout_risk, dead_stock = [], [], [], []
 
952
  for _, r in merged.iterrows():
953
  rec = {
954
+ "product": str(r.get(self.COL_PRODUCT) or r.get("product_key")),
955
  "stock_on_hand": float(r["stock_on_hand"]) if pd.notna(r["stock_on_hand"]) else None,
956
  "reorder_point": float(r["reorder_point"]) if pd.notna(r["reorder_point"]) else None,
957
  "lead_time_days": float(r["lead_time_days"]) if pd.notna(r["lead_time_days"]) else None,
 
981
  if self.cash_float_feed.empty:
982
  return {"status": "no_cash_data"}
983
 
 
984
  cf = self.cash_float_feed.copy()
985
  out_days = []
986
  high_var_days = 0
 
990
  cash_sales = pd.DataFrame(columns=["branch","date","cash_sales"])
991
  else:
992
  df = cur_df.copy()
993
+ df["date"] = df[self.COL_DT].dt.strftime("%Y-%m-%d")
994
  df["is_cash"] = (df.get("Money_Type","").astype(str).str.lower() == "cash")
995
+ cash_sales = df[df["is_cash"]].groupby([self.COL_BRANCH,"date"])[self.COL_REVENUE].sum().reset_index()
996
+ cash_sales = cash_sales.rename(columns={self.COL_BRANCH:"branch", self.COL_REVENUE:"cash_sales"})
997
 
998
  cf["date"] = cf["date"].astype(str).str[:10]
999
  merged = cf.merge(cash_sales, on=["branch","date"], how="left")
1000
  merged["cash_sales"] = merged["cash_sales"].fillna(0.0)
1001
 
 
1002
  for _, r in merged.iterrows():
1003
  opening = float(r.get("opening_float") or 0.0)
1004
  closing = float(r.get("closing_float") or 0.0)
 
1037
  # ------------------------- branch analytics -------------------------
1038
 
1039
  def _per_branch_blocks(self, cur_df: pd.DataFrame, previous_df: pd.DataFrame, current_bounds: Tuple[pd.Timestamp,pd.Timestamp]) -> Dict[str, Any]:
1040
+ if cur_df.empty or not self._has(cur_df, self.COL_BRANCH):
1041
  return {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}}
1042
 
1043
  per_branch = {}
1044
+ branches = sorted(map(str, cur_df[self.COL_BRANCH].dropna().unique().tolist()))
1045
  start_cur, end_cur = current_bounds
1046
  days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
1047
 
1048
  branch_summary_rows = []
1049
 
1050
  for br in branches:
1051
+ try:
1052
+ d = cur_df[cur_df[self.COL_BRANCH] == br]
1053
+ if d.empty:
1054
+ continue
1055
+
1056
+ revenue = float(d[self.COL_REVENUE].sum())
1057
+ cogs = float(d[self.COL_COGS].sum())
1058
+ gp = float(d[self.COL_GP].sum())
1059
+ margin_pct = (gp / revenue) if revenue > 0 else None
1060
+ tx = int(d[self.COL_INVOICE].nunique()) if self._has(d, self.COL_INVOICE) and d[self.COL_INVOICE].notna().any() else int(len(d))
1061
+ items = float(d[self.COL_UNITS].sum())
1062
+
1063
+ basket_df = self._build_basket_table(d)
1064
+ basket_kpis = self._basket_kpis(basket_df)
1065
+ temporal = self._temporal_patterns(d)
1066
+
1067
+ pagg = self._build_product_aggregates(d)
1068
+ if not pagg.empty:
1069
+ pagg["units_per_day"] = pagg["units"] / days
1070
+ product_lb = self._product_leaderboards(pagg)
1071
+ else:
1072
+ product_lb = self._empty_product_leaderboards()
1073
+
1074
+ affinity = self._affinity_pairs(d, basket_df)
1075
+ customers = self._customer_value(d, basket_df)
1076
+ cash_recon = self._cash_recon_block(d)
1077
+
1078
+ per_branch[br] = {
1079
+ "kpis": {
1080
+ "revenue": round(revenue, 2),
1081
+ "cogs": round(cogs, 2),
1082
+ "gross_profit": round(gp, 2),
1083
+ "gp_margin_pct": float(round(margin_pct, 4)) if margin_pct is not None else None,
1084
+ "transactions": tx,
1085
+ "items_sold": round(items, 2),
1086
+ "avg_basket_value": basket_kpis.get("median_basket_value"),
1087
+ "avg_items_per_basket": basket_kpis.get("avg_items_per_basket"),
1088
+ "avg_gp_per_basket": basket_kpis.get("avg_gross_profit_per_basket"),
1089
+ },
1090
+ "temporal": temporal,
1091
+ "products": product_lb,
1092
+ "affinity": affinity,
1093
+ "customer_value": customers,
1094
+ "cash_recon": cash_recon,
1095
+ "data_quality": {
1096
+ "duplicates_dropped": self._prepared_dupes_dropped,
1097
+ "non_sale_rows_excluded": self._non_sale_excluded,
1098
+ "currency_mixed": False
1099
+ }
 
 
 
 
 
 
 
 
1100
  }
 
1101
 
1102
+ branch_summary_rows.append({"branch": br, "revenue": revenue, "gp": gp, "gp_margin_pct": margin_pct or 0.0})
1103
+ except Exception as e:
1104
+ emit_kpi_debug(self.profile_id, "branch_block_error", {"branch": br, "error": str(e)})
1105
 
 
1106
  cross = {}
1107
  if branch_summary_rows:
1108
+ try:
1109
+ bs = pd.DataFrame(branch_summary_rows)
1110
+ cross["rankings"] = {
1111
+ "by_revenue": bs.sort_values("revenue", ascending=False)[["branch","revenue"]].to_dict(orient="records"),
1112
+ "by_gp_margin_pct": bs.sort_values("gp_margin_pct", ascending=False)[["branch","gp_margin_pct"]].to_dict(orient="records"),
1113
+ }
1114
+ cross["spread"] = {
1115
+ "gp_margin_pct_max": float(bs["gp_margin_pct"].max()) if len(bs) else None,
1116
+ "gp_margin_pct_min": float(bs["gp_margin_pct"].min()) if len(bs) else None,
1117
+ "gap_pct_points": float((bs["gp_margin_pct"].max() - bs["gp_margin_pct"].min())) if len(bs) else None,
1118
+ }
1119
+ tot_rev = float(bs["revenue"].sum())
1120
+ shares, hhi = [], 0.0
1121
+ for _, r in bs.iterrows():
1122
+ sh = (r["revenue"] / tot_rev) if tot_rev > 0 else 0.0
1123
+ shares.append({"branch": r["branch"], "share": float(round(sh, 6))})
1124
+ hhi += sh*sh
1125
+ cross["concentration"] = {"share_by_branch": shares, "hhi_revenue": float(round(hhi, 6))}
1126
+ if not previous_df.empty and self._has(previous_df, self.COL_BRANCH):
1127
+ prev_g = previous_df.groupby(self.COL_BRANCH).agg(
1128
+ revenue=(self.COL_REVENUE,"sum"),
1129
+ gp=(self.COL_GP,"sum")
1130
+ ).reset_index().rename(columns={self.COL_BRANCH:"branch"})
1131
+ cur_g = pd.DataFrame(branch_summary_rows)
1132
+ m = cur_g.merge(prev_g, on="branch", suffixes=("_cur","_prev"), how="left").fillna(0.0)
1133
+ wow_rows = []
1134
+ for _, r in m.iterrows():
1135
+ wow_rows.append({
1136
+ "branch": r["branch"],
1137
+ "revenue_wow": float(((r["revenue_cur"] - r["revenue_prev"]) / r["revenue_prev"])*100) if r["revenue_prev"]>0 else (100.0 if r["revenue_cur"]>0 else 0.0),
1138
+ "gp_wow": float(((r["gp_cur"] - r["gp_prev"]) / r["gp_prev"])*100) if r["gp_prev"]>0 else (100.0 if r["gp_cur"]>0 else 0.0),
1139
+ "avg_basket_wow": None
1140
+ })
1141
+ cross["trend_wow"] = wow_rows
1142
+ except Exception as e:
1143
+ emit_kpi_debug(self.profile_id, "branch_cross_error", {"error": str(e)})
 
1144
 
1145
  return {"params": self._branch_params(), "per_branch": per_branch, "cross_branch": cross}
1146
 
 
1157
 
1158
  def _product_leaderboards(self, g: pd.DataFrame) -> Dict[str, Any]:
1159
  top_k = int(self.params["top_k"])
 
1160
  g_marginpct = g.copy()
1161
  g_marginpct = g_marginpct[
1162
  (g_marginpct["revenue"] >= float(self.params["min_revenue_for_margin_pct"])) &
 
1169
  d = df.sort_values(col, ascending=asc).head(top_k)
1170
  return [
1171
  {
1172
+ "product": str(r[self.COL_PRODUCT]),
1173
  "revenue": round(float(r["revenue"]), 2),
1174
  "units": float(r["units"]),
1175
  "gross_profit": round(float(r["gross_profit"]), 2),
 
1212
  "revenue_pareto_top20pct_share": 0.0,
1213
  "gini_revenue": 0.0
1214
  }
 
1215
  total_rev = float(g["revenue"].sum())
1216
  total_units = float(g["units"].sum())
1217
  rev_sorted = g.sort_values("revenue", ascending=False)["revenue"].values
 
1220
  share_top5_rev = (rev_sorted[:5].sum() / total_rev) if total_rev > 0 else 0.0
1221
  share_top5_units = (units_sorted[:5].sum() / total_units) if total_units > 0 else 0.0
1222
 
 
1223
  n = len(rev_sorted)
1224
  if n == 0:
1225
  pareto = 0.0
 
1227
  k = max(1, int(np.ceil(0.2 * n)))
1228
  pareto = rev_sorted[:k].sum() / total_rev if total_rev > 0 else 0.0
1229
 
 
1230
  if total_rev <= 0 or n == 0:
1231
  gini = 0.0
1232
  else:
 
1233
  x = np.sort(rev_sorted) # ascending
1234
  cum = np.cumsum(x)
1235
  gini = 1.0 - 2.0 * np.sum(cum) / (n * np.sum(x)) + 1.0 / n
 
1253
  emit_kpi_debug(self.profile_id, "briefing", {"status": "no_current_period_data", **tfmeta})
1254
  return {"Status": f"No sales data for the current period ({tfmeta.get('period_label', 'N/A')}).", "meta": tfmeta}
1255
 
1256
+ snapshot = {}
1257
+ section_errors = {}
1258
+
1259
+ # Headline
1260
+ try:
1261
+ headline = self._headline(current_df, previous_df)
1262
+ snapshot["Summary Period"] = tfmeta.get("period_label", "This Week vs. Last Week")
1263
+ snapshot["Performance Snapshot (vs. Prior Period)"] = {
1264
+ "Total Revenue": f"{headline['total_revenue_fmt']} ({headline['total_revenue_change']})",
1265
+ "Gross Profit": f"{headline['gross_profit_fmt']} ({headline['gross_profit_change']})",
1266
+ "Transactions": f"{headline['transactions_value']} ({headline['transactions_change']})",
1267
+ }
1268
+ except Exception as e:
1269
+ section_errors["headline"] = str(e)
1270
 
1271
  # Basket & affinity
1272
+ try:
1273
+ basket_df = self._build_basket_table(current_df)
1274
+ snapshot["Basket Analysis"] = self._basket_kpis(basket_df)
1275
+ except Exception as e:
1276
+ section_errors["basket"] = str(e)
1277
+ snapshot["Basket Analysis"] = {"avg_items_per_basket": "N/A", "avg_gross_profit_per_basket": "N/A", "median_basket_value": "N/A", "basket_size_distribution": {}, "low_sample": True}
1278
+
1279
+ try:
1280
+ if 'basket_df' in locals():
1281
+ snapshot["Product Affinity"] = self._affinity_pairs(current_df, basket_df)
1282
+ else:
1283
+ snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []}
1284
+ except Exception as e:
1285
+ section_errors["affinity"] = str(e)
1286
+ snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []}
1287
 
1288
  # Temporal
1289
+ try:
1290
+ snapshot["Temporal Patterns"] = self._temporal_patterns(current_df)
1291
+ except Exception as e:
1292
+ section_errors["temporal"] = str(e)
1293
+ snapshot["Temporal Patterns"] = {"best_hour_by_profit": None, "best_day_by_profit": None, "hourly_series": [], "dow_series": [], "profit_heatmap_7x24": []}
1294
 
1295
  # Product aggregates + leaderboards + concentration
1296
+ try:
1297
+ start_cur = pd.Timestamp(tfmeta["current_start"])
1298
+ end_cur = pd.Timestamp(tfmeta["current_end"])
1299
+ days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
1300
+
1301
+ g_products = self._build_product_aggregates(current_df)
1302
+ if not g_products.empty:
1303
+ g_products["units_per_day"] = g_products["units"] / days
1304
+ product_lb = self._product_leaderboards(g_products)
1305
+ concentration = self._concentration_block(g_products)
1306
+ else:
1307
+ product_lb = self._empty_product_leaderboards()
1308
+ concentration = self._concentration_block(pd.DataFrame(columns=["revenue","units"]))
1309
 
1310
+ snapshot["Product KPIs"] = {"leaderboards": product_lb, "concentration": concentration}
1311
+ except Exception as e:
1312
+ section_errors["products"] = str(e)
1313
+ snapshot["Product KPIs"] = {"leaderboards": self._empty_product_leaderboards(), "concentration": self._concentration_block(pd.DataFrame(columns=["revenue","units"]))}
 
 
 
 
1314
 
1315
  # Customer value (RFM)
1316
+ try:
1317
+ # basket_df may or may not exist:
1318
+ bdf = locals().get("basket_df", pd.DataFrame())
1319
+ snapshot["Customer Value"] = self._customer_value(current_df, bdf)
1320
+ except Exception as e:
1321
+ section_errors["customer_value"] = str(e)
1322
+ snapshot["Customer Value"] = {
1323
+ "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
1324
+ "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []},
1325
+ "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None}
1326
+ }
1327
 
1328
  # Inventory (optional)
1329
+ try:
1330
+ g_products_for_inv = locals().get("g_products", pd.DataFrame())
1331
+ snapshot["Inventory"] = self._inventory_block(current_df, g_products_for_inv, (start_cur, end_cur))
1332
+ except Exception as e:
1333
+ section_errors["inventory"] = str(e)
1334
+ snapshot["Inventory"] = {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
1335
 
1336
  # Branch analytics
1337
+ try:
1338
+ snapshot["Branch Analytics"] = self._per_branch_blocks(current_df, previous_df, (start_cur, end_cur))
1339
+ except Exception as e:
1340
+ section_errors["branch"] = str(e)
1341
+ snapshot["Branch Analytics"] = {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}}
1342
+
1343
+ # Meta
1344
+ snapshot["meta"] = {
1345
+ "timeframes": tfmeta,
1346
+ "kpi_params": {
1347
+ "top_k": int(self.params["top_k"]),
1348
+ "min_revenue_for_margin_pct": float(self.params["min_revenue_for_margin_pct"]),
1349
+ "min_tx_for_margin_pct": int(self.params["min_tx_for_margin_pct"]),
1350
+ "rfm_window_days": int(self.params["rfm_window_days"]),
1351
+ "retention_factor": float(self.params["retention_factor"]),
1352
+ "min_support_baskets": int(self.params["min_support_baskets"]),
1353
+ "min_lift": float(self.params["min_lift"]),
1354
+ "blocked_products": list(self.params["blocked_products"]),
1355
+ "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]),
1356
+ "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]),
1357
  },
1358
+ "row_counts": {
1359
+ "input": int(len(self.raw)),
1360
+ "prepared": int(len(self.df)),
1361
+ "current_period": int(len(current_df)),
1362
+ "previous_period": int(len(previous_df)),
 
 
1363
  },
1364
+ "notes": [
1365
+ "Non-sales transaction types excluded (e.g., Transaction_Type_ID != 21).",
1366
+ f"Duplicates dropped: {getattr(self, '_prepared_dupes_dropped', 0)}",
1367
+ ],
1368
+ "section_errors": section_errors, # surfaced to the client for your debug panel
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1369
  }
1370
 
1371
  emit_kpi_debug(self.profile_id, "briefing_done", snapshot["meta"])
 
1378
  Safe for PandasAI exception fallback.
1379
  """
1380
  try:
 
1381
  prompt = (
1382
  "You are Iris, a concise business analyst.\n"
1383
  "IMPORTANT RULES:\n"
 
1392
  "Business Data (authoritative; JSON):\n"
1393
  f"{json.dumps(json_safe(briefing), ensure_ascii=False)}\n"
1394
  )
 
1395
  resp = self.llm.invoke(prompt)
 
1396
  text = getattr(resp, "content", None) or str(resp)
 
1397
  return sanitize_answer(text)
 
1398
  except Exception as e:
 
1399
  fallback = {
1400
  "note": "Narrative fallback failed; returning raw snapshot.",
1401
  "error": str(e)[:200],