# app.py — FIXED: KPI snapshot determinism + deep crunch logging (server + optional Firebase) # - Robust ColumnResolver (no guessing by the model; no hallucinated fields) # - Deterministic time windows (Harare tz; explicit start/end) # - KPI engine never uses LLM for numbers (LLM is narration-only fallback) # - JSON-safe snapshot (no pandas/Index objects) # - Detailed DEBUG logs for each crunch stage; optional Firebase debug logs # - Sanitized PandasAI output; hard guard on errors # - Cleaned Firebase init; clean FlaskResponse ResponseParser # - Safer temporal hints + guardrails # - Fixed sanitize_answer() bug and many stray smart-quotes/copy artifacts from __future__ import annotations import os import io import re import json import time import uuid import base64 import logging from typing import Any, Dict, List, Optional, Tuple import pandas as pd import numpy as np import matplotlib.pyplot as plt import requests import urllib.parse from flask import Flask, request, jsonify from flask_cors import CORS, cross_origin from dotenv import load_dotenv # LLMs from langchain_google_genai import ChatGoogleGenerativeAI import google.generativeai as genai # PandasAI from pandasai import SmartDataframe from pandasai.responses.response_parser import ResponseParser # Firebase import firebase_admin from firebase_admin import credentials, db # ----------------------------------------------------------------------------- # Init # ----------------------------------------------------------------------------- load_dotenv() app = Flask(__name__) CORS(app) logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("iris-app") # ----------------------------------------------------------------------------- # Firebase Initialization # ----------------------------------------------------------------------------- try: credentials_json_string = os.environ.get("FIREBASE") if not credentials_json_string: raise ValueError("FIREBASE env var is not set") credentials_json = json.loads(credentials_json_string) firebase_db_url = os.environ.get("Firebase_DB") if not firebase_db_url: raise ValueError("Firebase_DB env var is not set") cred = credentials.Certificate(credentials_json) firebase_admin.initialize_app(cred, {"databaseURL": firebase_db_url}) db_ref = db.reference() logger.info("Firebase Admin SDK initialized.") except Exception as e: logger.fatal(f"FATAL: Firebase init failed: {e}") raise LOG_KPI_TO_FIREBASE = os.getenv("LOG_KPI_TO_FIREBASE", "0") == "1" # ----------------------------------------------------------------------------- # Response parser for PandasAI # ----------------------------------------------------------------------------- class FlaskResponse(ResponseParser): def __init__(self, context): super().__init__(context) def format_dataframe(self, result): try: return result["value"].to_html() except Exception: return "" def format_plot(self, result): val = result.get("value") if hasattr(val, "savefig"): buf = io.BytesIO() val.savefig(buf, format="png") buf.seek(0) return f"data:image/png;base64,{base64.b64encode(buf.read()).decode('utf-8')}" if isinstance(val, str) and os.path.isfile(os.path.join(val)): with open(os.path.join(val), "rb") as f: return f"data:image/png;base64,{base64.b64encode(f.read()).decode('utf-8')}" return str(val) def format_other(self, result): return str(result.get("value", "")) # ----------------------------------------------------------------------------- # LLM init # ----------------------------------------------------------------------------- logger.info("Initializing models…") gemini_api_key = os.getenv("Gemini") if not gemini_api_key: raise ValueError("Gemini API key is required (env var Gemini).") llm = ChatGoogleGenerativeAI( api_key=gemini_api_key, model="gemini-2.0-flash", temperature=0.1 ) genai.configure(api_key=gemini_api_key) generation_config = {"temperature": 0.2, "top_p": 0.95, "max_output_tokens": 5000} model = genai.GenerativeModel( model_name="gemini-2.0-flash-lite-001", generation_config=generation_config, ) logger.info("AI models initialized.") user_defined_path = os.path.join("/exports/charts", str(uuid.uuid4())) logger.info(f"Chart export path set to: {user_defined_path}") # ----------------------------------------------------------------------------- # Temporal helpers + guardrails # ----------------------------------------------------------------------------- TZ = "Africa/Harare" def now_harare() -> pd.Timestamp: return pd.Timestamp.now(tz=TZ) def week_bounds_from(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: # Monday 00:00:00 to Sunday 23:59:59 monday = ts.tz_convert(TZ).normalize() - pd.Timedelta(days=ts.weekday()) sunday = monday + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59) return monday, sunday def next_week_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: this_mon, _ = week_bounds_from(ts) next_mon = this_mon + pd.Timedelta(days=7) next_sun = next_mon + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59) return next_mon, next_sun def last_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: first_this = ts.normalize().replace(day=1) last_month_end = first_this - pd.Timedelta(seconds=1) first_last = last_month_end.replace(day=1).normalize() last_month_end = last_month_end.replace(hour=23, minute=59, second=59) return first_last, last_month_end def this_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: first_this = ts.normalize().replace(day=1) if first_this.month == 12: first_next = first_this.replace(year=first_this.year + 1, month=1) else: first_next = first_this.replace(month=first_this.month + 1) last_this = first_next - pd.Timedelta(seconds=1) return first_this, last_this def quarter_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: q = (ts.month - 1) // 3 + 1 first_month = 3 * (q - 1) + 1 first = ts.normalize().replace(month=first_month, day=1) if first_month == 10: first_next = first.replace(year=first.year + 1, month=1) else: first_next = first.replace(month=first_month + 3) last = first_next - pd.Timedelta(seconds=1) return first, last _TEMP_WINDOWS = [ ("next week", lambda base: next_week_bounds(base)), ("this week", lambda base: week_bounds_from(base)), ("last week", lambda base: week_bounds_from(base - pd.Timedelta(days=7))), ("yesterday", lambda base: ( base.normalize() - pd.Timedelta(days=1), base.normalize() - pd.Timedelta(seconds=1) )), ("tomorrow", lambda base: ( base.normalize() + pd.Timedelta(days=1), base.normalize() + pd.Timedelta(days=1, hours=23, minutes=59, seconds=59) )), ("this month", lambda base: this_month_bounds(base)), ("last month", lambda base: last_month_bounds(base)), ("this quarter", lambda base: quarter_bounds(base)), ] def extract_numeric_window(question: str): m = re.search(r"(last|past)\s+(\d{1,3})\s+days", question.lower()) if m: n = int(m.group(2)) end = now_harare() start = end - pd.Timedelta(days=n) return start, end return None def temporal_hints(question: str) -> str: base = now_harare() hints = {} ql = question.lower() for key, fn in _TEMP_WINDOWS: if key in ql: s, e = fn(base) hints[key] = (s.strftime("%Y-%m-%d %H:%M:%S%z"), e.strftime("%Y-%m-%d %H:%M:%S%z")) rng = extract_numeric_window(question) if rng: s, e = rng hints[f"last {(e - s).days} days"] = (s.strftime("%Y-%m-%d %H:%M:%S%z"), e.strftime("%Y-%m-%d %H:%M:%S%z")) if not hints: return ( f"Temporal context: Today is {base.strftime('%Y-%m-%d')} ({TZ}). " f"Week is Monday–Sunday. Use pd.Timestamp.now(tz='{TZ}') and pd.Timedelta. " f"Avoid non-fixed rolling frequencies; use daily resample or dt.to_period('M')." ) parts = [f"Temporal context: Today is {base.strftime('%Y-%m-%d')} ({TZ})."] for k, (s, e) in hints.items(): parts.append(f'Interpret "{k}" as {s} to {e}.') parts.append("Avoid non-fixed rolling frequencies (MonthBegin/MonthEnd/Week). Prefer daily resample or Period groupby.") return " ".join(parts) def guardrails_preamble() -> str: return ( "Rules for any code you generate:\n" "1) Do NOT use 'from datetime import datetime' or 'datetime.date.today()'.\n" "2) Use pandas time APIs only: pd.Timestamp.now(tz='Africa/Harare'), pd.Timedelta, dt accessors.\n" "3) If a Time column exists, combine Date + Time and localize to Africa/Harare.\n" "4) Ensure numeric conversion with errors='coerce' for amount fields.\n" "5) Avoid non-fixed rolling/resample frequencies. For monthly, use df['datetime'].dt.to_period('M').\n" "6) Never print stack traces; return a concise answer or a plot/dataframe." ) # ----------------------------------------------------------------------------- # Error detection & sanitization # ----------------------------------------------------------------------------- ERROR_PATTERNS = [ "traceback", "exception", "keyerror", "nameerror", "syntaxerror", "modulenotfounderror", "importerror", "pipeline failed", "execution failed", "__import__", "failed with error", "attributeerror", "method_descriptor", "unfortunately, i was not able to answer your question", "non-fixed frequency", "monthbegin", "monthend", "week:", "weekday=" ] def _stringify(obj) -> str: try: if isinstance(obj, (pd.DataFrame, plt.Figure)): return "" if isinstance(obj, (bytes, bytearray)): return obj.decode("utf-8", errors="ignore") return str(obj) except Exception: return "" def _extract_text_like(ans): if isinstance(ans, dict): if "value" in ans: return _stringify(ans["value"]) for k in ("message", "text", "content"): if k in ans: return _stringify(ans[k]) return _stringify(ans) if hasattr(ans, "value"): try: return _stringify(getattr(ans, "value")) except Exception: pass return _stringify(ans) def looks_like_error(ans) -> bool: if isinstance(ans, (pd.DataFrame, plt.Figure)): return False s = _extract_text_like(ans).strip().lower() if not s: return True if any(p in s for p in ERROR_PATTERNS): return True if (("file " in s and " line " in s and "error" in s) or ("valueerror:" in s)): return True return False def sanitize_answer(ans) -> str: s = _extract_text_like(ans) # remove triple-backtick fences s = re.sub(r"```+\w*", "", s or "") # strip tracebacks safely tb = "Traceback (most recent call last):" if tb in s: s = s.split(tb, 1)[0] if "Unfortunately, I was not able to answer your question" in (s or ""): s = "" return (s or "").strip() # ----------------------------------------------------------------------------- # KPI Engine # ----------------------------------------------------------------------------- class ColumnResolver: """Map messy input columns to canonical names without guessing via LLM.""" AMOUNT_CANDIDATES = [ "Amount", "amount", "Total", "total", "Settled_Amount", "GrandTotal", "NetAmount", "Invoice_Total", "LineTotal" ] DATE_CANDIDATES = ["datetime", "Date", "date", "TxnDate", "CreatedAt", "created_at", "timestamp"] TIME_CANDIDATES = ["Time", "time", "TxnTime"] CURR_CANDIDATES = ["Currency", "currency", "Curr"] INVOICE_CANDIDATES = ["Invoice_Number", "InvoiceNo", "invoice_number", "Receipt_Number", "Order_No", "Txn_ID", "TxnId"] PRODUCT_CANDIDATES = ["Product", "Product_Name", "Item", "Description", "Item_Name"] UNITS_CANDIDATES = ["Units_Sold", "Qty", "Quantity", "Units"] UNIT_COST_CANDIDATES = ["Unit_Cost_Price", "UnitCost", "CostPrice"] TYPE_CANDIDATES = ["Transaction_Type", "TxnType", "Type"] TELLER_CANDIDATES = ["Teller_Username", "Teller", "Cashier", "User"] @staticmethod def pick_first(df: pd.DataFrame, names: List[str]) -> Optional[str]: for n in names: if n in df.columns: return n return None @staticmethod def pick_first_numeric(df: pd.DataFrame, names: List[str]) -> Optional[str]: for n in names: if n in df.columns: s = pd.to_numeric(df[n], errors="coerce") nonnull_ratio = s.notna().mean() if len(s) else 0 if nonnull_ratio >= 0.2: # needs at least some usable data return n return None @staticmethod def map(df: pd.DataFrame) -> Dict[str, Optional[str]]: return { "amount": ColumnResolver.pick_first_numeric(df, ColumnResolver.AMOUNT_CANDIDATES), "date": ColumnResolver.pick_first(df, ColumnResolver.DATE_CANDIDATES), "time": ColumnResolver.pick_first(df, ColumnResolver.TIME_CANDIDATES), "currency": ColumnResolver.pick_first(df, ColumnResolver.CURR_CANDIDATES), "invoice": ColumnResolver.pick_first(df, ColumnResolver.INVOICE_CANDIDATES), "product": ColumnResolver.pick_first(df, ColumnResolver.PRODUCT_CANDIDATES), "units": ColumnResolver.pick_first_numeric(df, ColumnResolver.UNITS_CANDIDATES), "unit_cost": ColumnResolver.pick_first_numeric(df, ColumnResolver.UNIT_COST_CANDIDATES), "txn_type": ColumnResolver.pick_first(df, ColumnResolver.TYPE_CANDIDATES), "teller": ColumnResolver.pick_first(df, ColumnResolver.TELLER_CANDIDATES), } def json_safe(obj: Any) -> Any: """Coerce numpy/pandas scalars & indexes to plain Python.""" if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): return float(obj) if isinstance(obj, (np.bool_,)): return bool(obj) if isinstance(obj, (pd.Timestamp,)): return obj.isoformat() if isinstance(obj, (pd.Series,)): return obj.to_dict() if isinstance(obj, (pd.Index,)): return [json_safe(x) for x in obj.tolist()] if isinstance(obj, (dict,)): return {k: json_safe(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [json_safe(x) for x in obj] return obj def emit_kpi_debug(profile_id: str, stage: str, payload: Dict[str, Any]) -> None: """Emit deep crunch logs to server; optionally mirror to Firebase for inspection.""" try: log_obj = {"profile_id": profile_id, "stage": stage, "payload": payload} logger.debug("KPI_DEBUG %s", json.dumps(json_safe(log_obj))) if LOG_KPI_TO_FIREBASE: ts = int(time.time()) db_ref.child(f"kpi_debug/{profile_id}/{stage}_{ts}").set(json_safe(payload)) except Exception as e: logger.warning(f"Failed to emit KPI debug logs: {e}") class IrisReportEngine: """ Backwards-compatible KPI engine: - Keeps existing snapshot sections untouched - Adds: Basket Analysis, Product Affinity, Temporal Patterns, Customer Value, Product KPIs (expanded), Inventory (optional), Branch Analytics (per-branch + cross-branch), Cash reconciliation (optional) - Never uses LLM for numbers. LLM only for narration elsewhere. """ # ---- Canonical column names (single source of truth; no magic strings sprinkled around) ---- COL_INVOICE = "_Invoice" COL_PRODUCT = "_Product" COL_TELLER = "_Teller" COL_TXNTYPE = "_TxnType" COL_BRANCH = "_Branch" COL_CUSTOMER = "_Customer" COL_DT = "_datetime" COL_AMOUNT = "_Amount" COL_UNITS = "_Units" COL_UNITCOST = "_UnitCost" COL_REVENUE = "_Revenue" COL_COGS = "_COGS" COL_GP = "_GrossProfit" COL_HOUR = "_Hour" COL_DOW = "_DOW" COL_DOWI = "_DOW_idx" DEFAULT_PARAMS = { "top_k": 5, "min_revenue_for_margin_pct": 50.0, "min_tx_for_margin_pct": 3, "rfm_window_days": 365, "retention_factor": 1.0, "min_support_baskets": 5, # minimum basket count for a pair to be reported "min_lift": 1.2, "blocked_products": ["Purchase"], # exclude accounting placeholders from product leaderboards/affinity "cash_variance_threshold_abs": 10.0, "cash_variance_threshold_pct": 0.008, # 0.8% } def __init__( self, profile_id: str, transactions_data: List[dict], llm_instance, stock_feed: Optional[List[Dict[str, Any]]] = None, cash_float_feed: Optional[List[Dict[str, Any]]] = None, params: Optional[Dict[str, Any]] = None, ): self.profile_id = profile_id self.llm = llm_instance self.params = {**self.DEFAULT_PARAMS, **(params or {})} self.raw = pd.DataFrame(transactions_data) self.stock_feed = pd.DataFrame(stock_feed) if stock_feed else pd.DataFrame() self.cash_float_feed = pd.DataFrame(cash_float_feed) if cash_float_feed else pd.DataFrame() self.df = self._load_and_prepare_data(self.raw) self.currency = self._get_primary_currency() # ------------------------- small helpers ------------------------- @staticmethod def _rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]: if df is None or df.empty: return [] out = [] for _, r in df.iterrows(): out.append({ "customer": str(r.get("customer")), "orders": int(r.get("orders", 0)), "revenue": float(r.get("revenue", 0.0)), "gp": float(r.get("gp", 0.0)), "recency_days": float(r.get("recency_days", np.nan)) if pd.notna(r.get("recency_days")) else None, "avg_basket_value": float(r.get("avg_basket_value", np.nan)) if pd.notna(r.get("avg_basket_value")) else None, }) return out def _has(self, df: pd.DataFrame, col: str) -> bool: return isinstance(df, pd.DataFrame) and col in df.columns # ------------------------- load/prepare ------------------------- def _load_and_prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: emit_kpi_debug(self.profile_id, "load", {"status": "empty_input"}) return pd.DataFrame() mapping = ColumnResolver.map(df) emit_kpi_debug(self.profile_id, "column_map", mapping) # Numerics amt_col = mapping["amount"] or ("Settled_Amount" if "Settled_Amount" in df.columns else None) if amt_col and amt_col in df: df[self.COL_AMOUNT] = pd.to_numeric(df[amt_col], errors="coerce") else: df[self.COL_AMOUNT] = pd.Series(dtype=float) if mapping["units"] and mapping["units"] in df: df[self.COL_UNITS] = pd.to_numeric(df[mapping["units"]], errors="coerce").fillna(0) else: df[self.COL_UNITS] = 0 if mapping["unit_cost"] and mapping["unit_cost"] in df: df[self.COL_UNITCOST] = pd.to_numeric(df[mapping["unit_cost"]], errors="coerce").fillna(0.0) else: df[self.COL_UNITCOST] = 0.0 # Datetime if mapping["date"] and mapping["date"] in df: if mapping["time"] and mapping["time"] in df: dt_series = pd.to_datetime( df[mapping["date"]].astype(str) + " " + df[mapping["time"]].astype(str), errors="coerce" ) else: dt_series = pd.to_datetime(df[mapping["date"]], errors="coerce") else: dt_series = pd.to_datetime(df.get("datetime"), errors="coerce") try: if getattr(dt_series.dt, "tz", None) is None: dt_series = dt_series.dt.tz_localize(TZ, nonexistent="shift_forward", ambiguous="NaT") else: dt_series = dt_series.dt.tz_convert(TZ) except Exception: pass df[self.COL_DT] = dt_series df = df.dropna(subset=[self.COL_DT]).copy() # Canonical dims df[self.COL_INVOICE] = df[mapping["invoice"]] if mapping["invoice"] and mapping["invoice"] in df else None df[self.COL_PRODUCT] = df[mapping["product"]] if mapping["product"] and mapping["product"] in df else None df[self.COL_TELLER] = df[mapping["teller"]] if mapping["teller"] and mapping["teller"] in df else None df[self.COL_TXNTYPE] = (df[mapping["txn_type"]].astype(str).str.lower() if mapping["txn_type"] and mapping["txn_type"] in df else df.get("Transaction_Type", "").astype(str).str.lower()) df[self.COL_BRANCH] = df.get("Branch") df[self.COL_CUSTOMER] = df.get("Customer_Reference") # Sales filter: keep explicit sales OR Transaction_Type_ID 21 OR positive amounts txid_series = df.get("Transaction_Type_ID") sales_mask = ( df[self.COL_TXNTYPE].isin(["sale", "sales", "invoice"]) | (pd.Series(False, index=df.index) if txid_series is None else txid_series.isin([21])) | (df[self.COL_AMOUNT] > 0) ) working = df[sales_mask].copy() # Derive measures working[self.COL_REVENUE] = working[self.COL_AMOUNT].fillna(0.0) working[self.COL_COGS] = (working[self.COL_UNITCOST] * working[self.COL_UNITS]).fillna(0.0) working[self.COL_GP] = (working[self.COL_REVENUE] - working[self.COL_COGS]).fillna(0.0) working[self.COL_HOUR] = working[self.COL_DT].dt.hour working[self.COL_DOW] = working[self.COL_DT].dt.day_name() working[self.COL_DOWI] = working[self.COL_DT].dt.dayofweek # 0=Mon .. 6=Sun # Deduplicate exact duplicate sale lines before = len(working) dedupe_keys = ["Transaction_ID", self.COL_INVOICE, self.COL_PRODUCT, self.COL_UNITS, self.COL_AMOUNT, self.COL_DT] existing_keys = [k for k in dedupe_keys if k in working.columns] if existing_keys: working = working.drop_duplicates(subset=existing_keys) duplicates_dropped = before - len(working) # Drop zero rows if both revenue and cost are zero working = working[(working[self.COL_REVENUE].abs() > 0) | (working[self.COL_COGS].abs() > 0)] emit_kpi_debug(self.profile_id, "prepared_counts", { "raw_rows": int(len(self.raw)), "rows_with_datetime": int(len(df)), "sale_like_rows": int(len(working)), "duplicates_dropped": int(duplicates_dropped), }) self._prepared_dupes_dropped = int(duplicates_dropped) self._non_sale_excluded = int(len(df) - len(working)) return working def _get_primary_currency(self) -> str: try: mapping = ColumnResolver.map(self.raw) if mapping["currency"] and mapping["currency"] in self.raw: mode_series = self.raw[mapping["currency"]].dropna().astype(str) if not mode_series.empty: val = mode_series.mode() if not val.empty: return str(val.iloc[0]) except Exception: pass return "USD" # ------------------------- timeframes & headline ------------------------- def _get_comparison_timeframes(self) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]: if self.df.empty: return self.df, self.df, {} now = now_harare() start_cur, end_cur = week_bounds_from(now) start_prev = start_cur - pd.Timedelta(days=7) end_prev = start_cur - pd.Timedelta(seconds=1) current_df = self.df[(self.df[self.COL_DT] >= start_cur) & (self.df[self.COL_DT] <= end_cur)] previous_df = self.df[(self.df[self.COL_DT] >= start_prev) & (self.df[self.COL_DT] <= end_prev)] meta = { "period_label": "This Week vs. Last Week", "current_start": start_cur.isoformat(), "current_end": end_cur.isoformat(), "prev_start": start_prev.isoformat(), "prev_end": end_prev.isoformat(), "current_rows": int(len(current_df)), "previous_rows": int(len(previous_df)), } emit_kpi_debug(self.profile_id, "timeframes", meta) return current_df, previous_df, meta @staticmethod def _pct_change(cur: float, prev: float) -> str: if prev == 0: return "+100%" if cur > 0 else "0.0%" return f"{((cur - prev) / prev) * 100:+.1f}%" def _headline(self, cur_df: pd.DataFrame, prev_df: pd.DataFrame) -> Dict[str, Any]: cur_rev = float(cur_df[self.COL_REVENUE].sum()) if not cur_df.empty else 0.0 prev_rev = float(prev_df[self.COL_REVENUE].sum()) if not prev_df.empty else 0.0 cur_gp = float(cur_df[self.COL_GP].sum()) if not cur_df.empty else 0.0 prev_gp = float(prev_df[self.COL_GP].sum()) if not prev_df.empty else 0.0 if self._has(cur_df, self.COL_INVOICE) and cur_df[self.COL_INVOICE].notna().any(): tx_now = int(cur_df[self.COL_INVOICE].nunique()) else: tx_now = int(len(cur_df)) if self._has(prev_df, self.COL_INVOICE) and prev_df[self.COL_INVOICE].notna().any(): tx_prev = int(prev_df[self.COL_INVOICE].nunique()) else: tx_prev = int(len(prev_df)) head = { "total_revenue_value": round(cur_rev, 2), "total_revenue_fmt": f"{self.currency} {cur_rev:,.2f}", "total_revenue_change": self._pct_change(cur_rev, prev_rev), "gross_profit_value": round(cur_gp, 2), "gross_profit_fmt": f"{self.currency} {cur_gp:,.2f}", "gross_profit_change": self._pct_change(cur_gp, prev_gp), "transactions_value": tx_now, "transactions_change": self._pct_change(tx_now, tx_prev), } emit_kpi_debug(self.profile_id, "headline", head) return head # ------------------------- core builders ------------------------- def _build_product_aggregates(self, cur_df: pd.DataFrame) -> pd.DataFrame: if cur_df.empty: return pd.DataFrame(columns=[ self.COL_PRODUCT,"revenue","units","cogs","gross_profit","margin_pct", "avg_selling_price","avg_unit_cost","tx_count" ]) df = cur_df.copy() # Exclude blocked products for leaderboards/affinity, but keep them in totals if needed if self.params["blocked_products"]: df = df[~df[self.COL_PRODUCT].astype(str).str.strip().isin(self.params["blocked_products"])] # Tx count via invoice nunique if available if self._has(df, self.COL_INVOICE) and df[self.COL_INVOICE].notna().any(): g = df.groupby(self.COL_PRODUCT, dropna=False).agg( revenue=(self.COL_REVENUE,"sum"), units=(self.COL_UNITS,"sum"), cogs=(self.COL_COGS,"sum"), gp=(self.COL_GP,"sum"), tx=(self.COL_INVOICE,"nunique") ) else: g = df.groupby(self.COL_PRODUCT, dropna=False).agg( revenue=(self.COL_REVENUE,"sum"), units=(self.COL_UNITS,"sum"), cogs=(self.COL_COGS,"sum"), gp=(self.COL_GP,"sum"), tx=(self.COL_PRODUCT,"size") ) g = g.rename(columns={"gp":"gross_profit", "tx":"tx_count"}).reset_index() # Derived ratios g["margin_pct"] = np.where(g["revenue"] > 0, g["gross_profit"] / g["revenue"], np.nan) g["avg_selling_price"] = np.where(g["units"] > 0, g["revenue"] / g["units"], np.nan) g["avg_unit_cost"] = np.where(g["units"] > 0, g["cogs"] / g["units"], np.nan) return g def _build_basket_table(self, cur_df: pd.DataFrame) -> pd.DataFrame: if cur_df.empty or not self._has(cur_df, self.COL_INVOICE): return pd.DataFrame(columns=[self.COL_INVOICE,"basket_revenue","basket_gp","basket_items","_datetime_max"]) b = cur_df.groupby(self.COL_INVOICE, dropna=False).agg( basket_revenue=(self.COL_REVENUE,"sum"), basket_gp=(self.COL_GP,"sum"), basket_items=(self.COL_UNITS,"sum"), _datetime_max=(self.COL_DT,"max"), ).reset_index() return b def _basket_kpis(self, basket_df: pd.DataFrame) -> Dict[str, Any]: if basket_df is None or basket_df.empty: return { "avg_items_per_basket": "N/A", "avg_gross_profit_per_basket": "N/A", "median_basket_value": "N/A", "basket_size_distribution": {}, "low_sample": True } avg_items = float(basket_df["basket_items"].mean()) avg_gp = float(basket_df["basket_gp"].mean()) median_value = float(basket_df["basket_revenue"].median()) sizes = basket_df["basket_items"].fillna(0) bins = { "1": int((sizes == 1).sum()), "2-3": int(((sizes >= 2) & (sizes <= 3)).sum()), "4-5": int(((sizes >= 4) & (sizes <= 5)).sum()), "6_plus": int((sizes >= 6).sum()), } return { "avg_items_per_basket": round(avg_items, 2), "avg_gross_profit_per_basket": round(avg_gp, 2), "median_basket_value": round(median_value, 2), "basket_size_distribution": bins } def _affinity_pairs(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]: # Build unique product sets per invoice, count pairs if cur_df.empty or basket_df.empty or not self._has(cur_df, self.COL_PRODUCT) or not self._has(cur_df, self.COL_INVOICE): return {"params": self._affinity_params(), "top_pairs": []} tmp = cur_df[[self.COL_INVOICE, self.COL_PRODUCT]].dropna() if tmp.empty: return {"params": self._affinity_params(), "top_pairs": []} blocked = set(self.params.get("blocked_products", []) or []) tmp = tmp[~tmp[self.COL_PRODUCT].astype(str).str.strip().isin(blocked)] if tmp.empty: return {"params": self._affinity_params(), "top_pairs": []} products_per_invoice = tmp.groupby(self.COL_INVOICE)[self.COL_PRODUCT].agg(lambda s: sorted(set(map(str, s)))).reset_index() total_baskets = int(len(products_per_invoice)) if total_baskets == 0: return {"params": self._affinity_params(), "top_pairs": []} from collections import Counter single_counter = Counter() for prods in products_per_invoice[self.COL_PRODUCT]: single_counter.update(prods) pair_counter = Counter() for prods in products_per_invoice[self.COL_PRODUCT]: if len(prods) < 2: continue for i in range(len(prods)): for j in range(i+1, len(prods)): a, b = prods[i], prods[j] pair = (a, b) if a <= b else (b, a) pair_counter[pair] += 1 min_support_baskets = int(self.params["min_support_baskets"]) min_lift = float(self.params["min_lift"]) top_k = int(self.params["top_k"]) rows = [] # Average pair revenue across baskets containing both (optional; approximate) inv_with_products = cur_df.groupby(self.COL_INVOICE)[self.COL_PRODUCT].apply(lambda s: set(map(str, s.dropna()))) rev_by_inv = cur_df.groupby(self.COL_INVOICE)[self.COL_REVENUE].sum() for (a, b), ab_count in pair_counter.items(): if ab_count < min_support_baskets: continue support_a = single_counter.get(a, 0) / total_baskets support_b = single_counter.get(b, 0) / total_baskets support_ab = ab_count / total_baskets if support_a == 0 or support_b == 0: continue confidence = support_ab / support_a lift = support_ab / (support_a * support_b) if (support_a * support_b) > 0 else np.nan if not np.isfinite(lift) or lift < min_lift: continue inv_mask = inv_with_products.apply(lambda s: (a in s) and (b in s)) pair_invoices = inv_mask[inv_mask].index avg_pair_revenue = float(rev_by_inv.loc[pair_invoices].mean()) if len(pair_invoices) else np.nan rows.append({ "a": a, "b": b, "support_ab": round(float(support_ab), 6), "confidence_a_to_b": round(float(confidence), 6), "lift": round(float(lift), 6), "pair_basket_count": int(ab_count), "avg_pair_revenue": round(avg_pair_revenue, 2) if np.isfinite(avg_pair_revenue) else None, }) rows.sort(key=lambda r: (r["lift"], r["pair_basket_count"], r["support_ab"]), reverse=True) emit_kpi_debug(self.profile_id, "affinity_pairs_counts", { "total_baskets": total_baskets, "pairs_after_filters": len(rows) }) return {"params": self._affinity_params(), "top_pairs": rows[:top_k]} def _affinity_params(self) -> Dict[str, Any]: return { "min_support_baskets": int(self.params["min_support_baskets"]), "min_lift": float(self.params["min_lift"]), "top_k": int(self.params["top_k"]), } def _temporal_patterns(self, cur_df: pd.DataFrame) -> Dict[str, Any]: if cur_df.empty: return { "best_hour_by_profit": None, "best_day_by_profit": None, "hourly_series": [], "dow_series": [], "profit_heatmap_7x24": [] } gh = cur_df.groupby(self.COL_HOUR, dropna=False).agg( revenue=(self.COL_REVENUE,"sum"), gross_profit=(self.COL_GP,"sum") ).reset_index() best_hour_idx = int(gh.loc[gh["gross_profit"].idxmax(), self.COL_HOUR]) if not gh.empty else None best_hour_gp = float(gh["gross_profit"].max()) if not gh.empty else None gd = cur_df.groupby(self.COL_DOW, dropna=False).agg( revenue=(self.COL_REVENUE,"sum"), gross_profit=(self.COL_GP,"sum") ).reset_index() order_map = cur_df.groupby(self.COL_DOW)[self.COL_DOWI].max().to_dict() gd["__ord"] = gd[self.COL_DOW].map(order_map) gd = gd.sort_values("__ord", kind="stable") best_day_row = gd.loc[gd["gross_profit"].idxmax()] if not gd.empty else None best_day = {"day": str(best_day_row[self.COL_DOW]), "gross_profit": float(best_day_row["gross_profit"])} if best_day_row is not None else None m = cur_df.groupby([self.COL_DOWI, self.COL_HOUR], dropna=False)[self.COL_GP].sum().unstack(fill_value=0) m = m.reindex(index=range(0,7), columns=range(0,24), fill_value=0) heatmap = [[float(x) for x in row] for row in m.values.tolist()] hourly_series = gh.rename(columns={self.COL_HOUR:"hour"}).to_dict(orient="records") dow_series = gd[[self.COL_DOW,"revenue","gross_profit"]].rename(columns={self.COL_DOW:"day"}).to_dict(orient="records") return { "best_hour_by_profit": {"hour": best_hour_idx, "gross_profit": round(best_hour_gp, 2)} if best_hour_idx is not None else None, "best_day_by_profit": best_day, "hourly_series": [{"hour": int(r["hour"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in hourly_series], "dow_series": [{"day": str(r["day"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in dow_series], "profit_heatmap_7x24": heatmap } def _customer_value(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]: if cur_df.empty or not self._has(cur_df, self.COL_CUSTOMER): return { "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20}, "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []}, "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None} } df = cur_df.copy() last_date = df.groupby(self.COL_CUSTOMER)[self.COL_DT].max() if self._has(df, self.COL_INVOICE): orders = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].nunique() else: orders = df.groupby(self.COL_CUSTOMER).size() revenue = df.groupby(self.COL_CUSTOMER)[self.COL_REVENUE].sum() gp = df.groupby(self.COL_CUSTOMER)[self.COL_GP].sum() # Avg basket value per customer (from their invoices) if not basket_df.empty and self._has(df, self.COL_INVOICE): inv_to_rev = basket_df.set_index(self.COL_INVOICE)["basket_revenue"] cust_invoices = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].agg(lambda x: sorted(set(x))) avg_basket_val = {} for cust, invs in cust_invoices.items(): vals = inv_to_rev.reindex(invs).dropna() avg_basket_val[cust] = float(vals.mean()) if len(vals) else np.nan avg_basket = pd.Series(avg_basket_val) else: avg_basket = pd.Series(dtype=float) base = now_harare().normalize() recency_days = (base - last_date).dt.total_seconds() / (60*60*24) rfm = pd.DataFrame({ "customer": last_date.index.astype(str), "last_date": last_date.values, "orders": orders.reindex(last_date.index).fillna(0).astype(int).values, "revenue": revenue.reindex(last_date.index).fillna(0.0).values, "gp": gp.reindex(last_date.index).fillna(0.0).values, "recency_days": recency_days.values, "avg_basket_value": avg_basket.reindex(last_date.index).values }).fillna({"avg_basket_value": np.nan}) vip = rfm.sort_values(["gp","orders","revenue"], ascending=[False, False, False]).head(20) if len(rfm): gp_q3 = rfm["gp"].quantile(0.75) at_risk = rfm[(rfm["gp"] >= gp_q3) & (rfm["recency_days"] > 30)].sort_values(["gp","recency_days"], ascending=[False, False]).head(20) else: at_risk = rfm.head(0) new_customers = rfm[(rfm["orders"] == 1) & (rfm["recency_days"] <= 7)].sort_values("gp", ascending=False).head(20) out = { "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20}, "leaderboards": { "top_customers_by_gp": self._rfm_to_list(vip), "at_risk": self._rfm_to_list(at_risk), "new_customers": self._rfm_to_list(new_customers) }, "rfm_summary": { "unique_customers": int(rfm["customer"].nunique()), "median_recency_days": float(rfm["recency_days"].median()) if len(rfm) else None, "median_orders": float(rfm["orders"].median()) if len(rfm) else None, "median_gp": float(rfm["gp"].median()) if len(rfm) else None } } emit_kpi_debug(self.profile_id, "rfm_done", {"customers": int(rfm["customer"].nunique())}) return json_safe(out) # ------------------------- inventory & cash ------------------------- def _inventory_block(self, cur_df: pd.DataFrame, product_agg: pd.DataFrame, current_bounds: Tuple[pd.Timestamp, pd.Timestamp]) -> Dict[str, Any]: if self.stock_feed.empty: return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}} start_cur, end_cur = current_bounds days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0) pa = (product_agg or pd.DataFrame()).copy() if pa.empty: return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}} pa["units_per_day"] = pa["units"] / days sf = self.stock_feed.copy() sf["product_key"] = sf.get("product", sf.get("Product", "")).astype(str).str.strip() pa["product_key"] = pa[self.COL_PRODUCT].astype(str).str.strip() merged = pa.merge(sf, on="product_key", how="right", suffixes=("", "_stock")) merged["units_per_day"] = merged["units_per_day"].fillna(0.0) merged["stock_on_hand"] = pd.to_numeric(merged.get("stock_on_hand", np.nan), errors="coerce") merged["reorder_point"] = pd.to_numeric(merged.get("reorder_point", np.nan), errors="coerce") merged["lead_time_days"] = pd.to_numeric(merged.get("lead_time_days", np.nan), errors="coerce") merged["days_of_cover"] = np.where(merged["units_per_day"] > 0, merged["stock_on_hand"] / merged["units_per_day"], np.inf) def status_row(r): if pd.isna(r.get("stock_on_hand")): return "unknown" if (r["stock_on_hand"] or 0) <= 0: return "stockout" if pd.notna(r.get("reorder_point")) and r["stock_on_hand"] <= r["reorder_point"]: return "low" if np.isfinite(r["days_of_cover"]) and pd.notna(r.get("lead_time_days")) and r["days_of_cover"] < r["lead_time_days"]: return "stockout_risk" if r["units_per_day"] == 0 and (r["stock_on_hand"] or 0) > 0: return "dead_stock" return "ok" merged["status"] = merged.apply(status_row, axis=1) products_out, low_stock, stockout_risk, dead_stock = [], [], [], [] for _, r in merged.iterrows(): rec = { "product": str(r.get(self.COL_PRODUCT) or r.get("product_key")), "stock_on_hand": float(r["stock_on_hand"]) if pd.notna(r["stock_on_hand"]) else None, "reorder_point": float(r["reorder_point"]) if pd.notna(r["reorder_point"]) else None, "lead_time_days": float(r["lead_time_days"]) if pd.notna(r["lead_time_days"]) else None, "days_of_cover": float(r["days_of_cover"]) if np.isfinite(r["days_of_cover"]) else None, "daily_sales_velocity": float(r["units_per_day"]), "status": str(r["status"]) } products_out.append(rec) if rec["status"] == "low": low_stock.append(rec["product"]) elif rec["status"] == "stockout_risk": stockout_risk.append(rec["product"]) elif rec["status"] == "dead_stock": dead_stock.append(rec["product"]) return { "stock_snapshot_asof": now_harare().isoformat(), "products": products_out, "alerts": { "low_stock": sorted(set(low_stock)), "stockout_risk": sorted(set(stockout_risk)), "dead_stock": sorted(set(dead_stock)) } } def _cash_recon_block(self, cur_df: pd.DataFrame) -> Dict[str, Any]: if self.cash_float_feed.empty: return {"status": "no_cash_data"} cf = self.cash_float_feed.copy() out_days = [] high_var_days = 0 # Compute cash sales per branch/date from cur_df if cur_df.empty: cash_sales = pd.DataFrame(columns=["branch","date","cash_sales"]) else: df = cur_df.copy() df["date"] = df[self.COL_DT].dt.strftime("%Y-%m-%d") df["is_cash"] = (df.get("Money_Type","").astype(str).str.lower() == "cash") cash_sales = df[df["is_cash"]].groupby([self.COL_BRANCH,"date"])[self.COL_REVENUE].sum().reset_index() cash_sales = cash_sales.rename(columns={self.COL_BRANCH:"branch", self.COL_REVENUE:"cash_sales"}) cf["date"] = cf["date"].astype(str).str[:10] merged = cf.merge(cash_sales, on=["branch","date"], how="left") merged["cash_sales"] = merged["cash_sales"].fillna(0.0) for _, r in merged.iterrows(): opening = float(r.get("opening_float") or 0.0) closing = float(r.get("closing_float") or 0.0) drops = float(r.get("drops") or 0.0) petty = float(r.get("petty_cash") or 0.0) declared = float(r.get("declared_cash") or 0.0) cash_sales_val = float(r.get("cash_sales") or 0.0) expected = opening + cash_sales_val - drops - petty - closing variance = declared - expected variance_pct = (variance / cash_sales_val) if cash_sales_val > 0 else 0.0 flag = (abs(variance) >= float(self.params["cash_variance_threshold_abs"])) or \ (abs(variance_pct) >= float(self.params["cash_variance_threshold_pct"])) if flag: high_var_days += 1 out_days.append({ "branch": str(r["branch"]), "date": str(r["date"]), "cash_sales": round(cash_sales_val, 2), "declared_cash": round(declared, 2), "opening_float": round(opening, 2), "closing_float": round(closing, 2), "drops": round(drops, 2), "petty_cash": round(petty, 2), "expected_cash": round(expected, 2), "variance": round(variance, 2), "variance_pct": round(variance_pct, 4), "flag": bool(flag), }) return {"days": out_days, "flags": {"high_variance_days": int(high_var_days)}} # ------------------------- branch analytics ------------------------- def _per_branch_blocks(self, cur_df: pd.DataFrame, previous_df: pd.DataFrame, current_bounds: Tuple[pd.Timestamp,pd.Timestamp]) -> Dict[str, Any]: if cur_df.empty or not self._has(cur_df, self.COL_BRANCH): return {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}} per_branch = {} branches = sorted(map(str, cur_df[self.COL_BRANCH].dropna().unique().tolist())) start_cur, end_cur = current_bounds days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0) branch_summary_rows = [] for br in branches: try: d = cur_df[cur_df[self.COL_BRANCH] == br] if d.empty: continue revenue = float(d[self.COL_REVENUE].sum()) cogs = float(d[self.COL_COGS].sum()) gp = float(d[self.COL_GP].sum()) margin_pct = (gp / revenue) if revenue > 0 else None tx = int(d[self.COL_INVOICE].nunique()) if self._has(d, self.COL_INVOICE) and d[self.COL_INVOICE].notna().any() else int(len(d)) items = float(d[self.COL_UNITS].sum()) basket_df = self._build_basket_table(d) basket_kpis = self._basket_kpis(basket_df) temporal = self._temporal_patterns(d) pagg = self._build_product_aggregates(d) if not pagg.empty: pagg["units_per_day"] = pagg["units"] / days product_lb = self._product_leaderboards(pagg) else: product_lb = self._empty_product_leaderboards() affinity = self._affinity_pairs(d, basket_df) customers = self._customer_value(d, basket_df) cash_recon = self._cash_recon_block(d) per_branch[br] = { "kpis": { "revenue": round(revenue, 2), "cogs": round(cogs, 2), "gross_profit": round(gp, 2), "gp_margin_pct": float(round(margin_pct, 4)) if margin_pct is not None else None, "transactions": tx, "items_sold": round(items, 2), "avg_basket_value": basket_kpis.get("median_basket_value"), "avg_items_per_basket": basket_kpis.get("avg_items_per_basket"), "avg_gp_per_basket": basket_kpis.get("avg_gross_profit_per_basket"), }, "temporal": temporal, "products": product_lb, "affinity": affinity, "customer_value": customers, "cash_recon": cash_recon, "data_quality": { "duplicates_dropped": self._prepared_dupes_dropped, "non_sale_rows_excluded": self._non_sale_excluded, "currency_mixed": False } } branch_summary_rows.append({"branch": br, "revenue": revenue, "gp": gp, "gp_margin_pct": margin_pct or 0.0}) except Exception as e: emit_kpi_debug(self.profile_id, "branch_block_error", {"branch": br, "error": str(e)}) cross = {} if branch_summary_rows: try: bs = pd.DataFrame(branch_summary_rows) cross["rankings"] = { "by_revenue": bs.sort_values("revenue", ascending=False)[["branch","revenue"]].to_dict(orient="records"), "by_gp_margin_pct": bs.sort_values("gp_margin_pct", ascending=False)[["branch","gp_margin_pct"]].to_dict(orient="records"), } cross["spread"] = { "gp_margin_pct_max": float(bs["gp_margin_pct"].max()) if len(bs) else None, "gp_margin_pct_min": float(bs["gp_margin_pct"].min()) if len(bs) else None, "gap_pct_points": float((bs["gp_margin_pct"].max() - bs["gp_margin_pct"].min())) if len(bs) else None, } tot_rev = float(bs["revenue"].sum()) shares, hhi = [], 0.0 for _, r in bs.iterrows(): sh = (r["revenue"] / tot_rev) if tot_rev > 0 else 0.0 shares.append({"branch": r["branch"], "share": float(round(sh, 6))}) hhi += sh*sh cross["concentration"] = {"share_by_branch": shares, "hhi_revenue": float(round(hhi, 6))} if not previous_df.empty and self._has(previous_df, self.COL_BRANCH): prev_g = previous_df.groupby(self.COL_BRANCH).agg( revenue=(self.COL_REVENUE,"sum"), gp=(self.COL_GP,"sum") ).reset_index().rename(columns={self.COL_BRANCH:"branch"}) cur_g = pd.DataFrame(branch_summary_rows) m = cur_g.merge(prev_g, on="branch", suffixes=("_cur","_prev"), how="left").fillna(0.0) wow_rows = [] for _, r in m.iterrows(): wow_rows.append({ "branch": r["branch"], "revenue_wow": float(((r["revenue_cur"] - r["revenue_prev"]) / r["revenue_prev"])*100) if r["revenue_prev"]>0 else (100.0 if r["revenue_cur"]>0 else 0.0), "gp_wow": float(((r["gp_cur"] - r["gp_prev"]) / r["gp_prev"])*100) if r["gp_prev"]>0 else (100.0 if r["gp_cur"]>0 else 0.0), "avg_basket_wow": None }) cross["trend_wow"] = wow_rows except Exception as e: emit_kpi_debug(self.profile_id, "branch_cross_error", {"error": str(e)}) return {"params": self._branch_params(), "per_branch": per_branch, "cross_branch": cross} def _branch_params(self) -> Dict[str, Any]: return { "top_k": int(self.params["top_k"]), "min_support_baskets": int(self.params["min_support_baskets"]), "min_lift": float(self.params["min_lift"]), "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]), "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]), } # ------------------------- product leaderboards & concentration ------------------------- def _product_leaderboards(self, g: pd.DataFrame) -> Dict[str, Any]: top_k = int(self.params["top_k"]) g_marginpct = g.copy() g_marginpct = g_marginpct[ (g_marginpct["revenue"] >= float(self.params["min_revenue_for_margin_pct"])) & (g_marginpct["tx_count"] >= int(self.params["min_tx_for_margin_pct"])) ] def top(df, col, asc=False): if df.empty: return [] d = df.sort_values(col, ascending=asc).head(top_k) return [ { "product": str(r[self.COL_PRODUCT]), "revenue": round(float(r["revenue"]), 2), "units": float(r["units"]), "gross_profit": round(float(r["gross_profit"]), 2), "margin_pct": float(round(r["margin_pct"], 4)) if pd.notna(r["margin_pct"]) else None, "tx_count": int(r["tx_count"]), "avg_selling_price": float(round(r["avg_selling_price"], 4)) if pd.notna(r["avg_selling_price"]) else None, "avg_unit_cost": float(round(r["avg_unit_cost"], 4)) if pd.notna(r["avg_unit_cost"]) else None, "units_per_day": float(round(r.get("units_per_day", np.nan), 4)) if pd.notna(r.get("units_per_day", np.nan)) else None, } for _, r in d.iterrows() ] return { "top_by_revenue": top(g, "revenue", asc=False), "top_by_units": top(g, "units", asc=False), "top_by_margin_value": top(g, "gross_profit", asc=False), "top_by_margin_pct": top(g_marginpct, "margin_pct", asc=False), "bottom_by_revenue": top(g, "revenue", asc=True), "loss_makers": top(g[g["gross_profit"] < 0], "gross_profit", asc=True), "by_velocity": top(g.assign(units_per_day=g.get("units_per_day", np.nan)), "units_per_day", asc=False), "by_gp_per_unit": top(g.assign(gp_per_unit=np.where(g["units"]>0, g["gross_profit"]/g["units"], np.nan)), "gp_per_unit", asc=False), } def _empty_product_leaderboards(self) -> Dict[str, Any]: return { "top_by_revenue": [], "top_by_units": [], "top_by_margin_value": [], "top_by_margin_pct": [], "bottom_by_revenue": [], "loss_makers": [], "by_velocity": [], "by_gp_per_unit": [], } def _concentration_block(self, g: pd.DataFrame) -> Dict[str, Any]: if g.empty: return { "revenue_share_top5": 0.0, "units_share_top5": 0.0, "revenue_pareto_top20pct_share": 0.0, "gini_revenue": 0.0 } total_rev = float(g["revenue"].sum()) total_units = float(g["units"].sum()) rev_sorted = g.sort_values("revenue", ascending=False)["revenue"].values units_sorted = g.sort_values("units", ascending=False)["units"].values share_top5_rev = (rev_sorted[:5].sum() / total_rev) if total_rev > 0 else 0.0 share_top5_units = (units_sorted[:5].sum() / total_units) if total_units > 0 else 0.0 n = len(rev_sorted) if n == 0: pareto = 0.0 else: k = max(1, int(np.ceil(0.2 * n))) pareto = rev_sorted[:k].sum() / total_rev if total_rev > 0 else 0.0 if total_rev <= 0 or n == 0: gini = 0.0 else: x = np.sort(rev_sorted) # ascending cum = np.cumsum(x) gini = 1.0 - 2.0 * np.sum(cum) / (n * np.sum(x)) + 1.0 / n return { "revenue_share_top5": float(round(share_top5_rev, 6)), "units_share_top5": float(round(share_top5_units, 6)), "revenue_pareto_top20pct_share": float(round(pareto, 6)), "gini_revenue": float(round(gini, 6)) } # ------------------------- public API ------------------------- def get_business_intelligence_briefing(self) -> Dict[str, Any]: if self.df.empty: emit_kpi_debug(self.profile_id, "briefing", {"status": "no_data"}) return {"Status": "No sales data available to generate a briefing."} current_df, previous_df, tfmeta = self._get_comparison_timeframes() if current_df.empty: emit_kpi_debug(self.profile_id, "briefing", {"status": "no_current_period_data", **tfmeta}) return {"Status": f"No sales data for the current period ({tfmeta.get('period_label', 'N/A')}).", "meta": tfmeta} snapshot = {} section_errors = {} # Headline try: headline = self._headline(current_df, previous_df) snapshot["Summary Period"] = tfmeta.get("period_label", "This Week vs. Last Week") snapshot["Performance Snapshot (vs. Prior Period)"] = { "Total Revenue": f"{headline['total_revenue_fmt']} ({headline['total_revenue_change']})", "Gross Profit": f"{headline['gross_profit_fmt']} ({headline['gross_profit_change']})", "Transactions": f"{headline['transactions_value']} ({headline['transactions_change']})", } except Exception as e: section_errors["headline"] = str(e) # Basket & affinity try: basket_df = self._build_basket_table(current_df) snapshot["Basket Analysis"] = self._basket_kpis(basket_df) except Exception as e: section_errors["basket"] = str(e) snapshot["Basket Analysis"] = {"avg_items_per_basket": "N/A", "avg_gross_profit_per_basket": "N/A", "median_basket_value": "N/A", "basket_size_distribution": {}, "low_sample": True} try: if 'basket_df' in locals(): snapshot["Product Affinity"] = self._affinity_pairs(current_df, basket_df) else: snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []} except Exception as e: section_errors["affinity"] = str(e) snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []} # Temporal try: snapshot["Temporal Patterns"] = self._temporal_patterns(current_df) except Exception as e: section_errors["temporal"] = str(e) snapshot["Temporal Patterns"] = {"best_hour_by_profit": None, "best_day_by_profit": None, "hourly_series": [], "dow_series": [], "profit_heatmap_7x24": []} # Product aggregates + leaderboards + concentration try: start_cur = pd.Timestamp(tfmeta["current_start"]) end_cur = pd.Timestamp(tfmeta["current_end"]) days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0) g_products = self._build_product_aggregates(current_df) if not g_products.empty: g_products["units_per_day"] = g_products["units"] / days product_lb = self._product_leaderboards(g_products) concentration = self._concentration_block(g_products) else: product_lb = self._empty_product_leaderboards() concentration = self._concentration_block(pd.DataFrame(columns=["revenue","units"])) snapshot["Product KPIs"] = {"leaderboards": product_lb, "concentration": concentration} except Exception as e: section_errors["products"] = str(e) snapshot["Product KPIs"] = {"leaderboards": self._empty_product_leaderboards(), "concentration": self._concentration_block(pd.DataFrame(columns=["revenue","units"]))} # Customer value (RFM) try: # basket_df may or may not exist: bdf = locals().get("basket_df", pd.DataFrame()) snapshot["Customer Value"] = self._customer_value(current_df, bdf) except Exception as e: section_errors["customer_value"] = str(e) snapshot["Customer Value"] = { "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20}, "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []}, "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None} } # Inventory (optional) try: g_products_for_inv = locals().get("g_products", pd.DataFrame()) snapshot["Inventory"] = self._inventory_block(current_df, g_products_for_inv, (start_cur, end_cur)) except Exception as e: section_errors["inventory"] = str(e) snapshot["Inventory"] = {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}} # Branch analytics try: snapshot["Branch Analytics"] = self._per_branch_blocks(current_df, previous_df, (start_cur, end_cur)) except Exception as e: section_errors["branch"] = str(e) snapshot["Branch Analytics"] = {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}} # Meta snapshot["meta"] = { "timeframes": tfmeta, "kpi_params": { "top_k": int(self.params["top_k"]), "min_revenue_for_margin_pct": float(self.params["min_revenue_for_margin_pct"]), "min_tx_for_margin_pct": int(self.params["min_tx_for_margin_pct"]), "rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "min_support_baskets": int(self.params["min_support_baskets"]), "min_lift": float(self.params["min_lift"]), "blocked_products": list(self.params["blocked_products"]), "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]), "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]), }, "row_counts": { "input": int(len(self.raw)), "prepared": int(len(self.df)), "current_period": int(len(current_df)), "previous_period": int(len(previous_df)), }, "notes": [ "Non-sales transaction types excluded (e.g., Transaction_Type_ID != 21).", f"Duplicates dropped: {getattr(self, '_prepared_dupes_dropped', 0)}", ], "section_errors": section_errors, # surfaced to the client for your debug panel } emit_kpi_debug(self.profile_id, "briefing_done", snapshot["meta"]) return json_safe(snapshot) def synthesize_fallback_response(self, briefing: dict, user_question: str) -> str: """ LLM is narration-only. Do NOT invent or recompute numbers here. Use the already-deterministic 'briefing' dict as the single source of truth. Safe for PandasAI exception fallback. """ try: prompt = ( "You are Iris, a concise business analyst.\n" "IMPORTANT RULES:\n" "• DO NOT invent numbers. ONLY use values present in the Business Data JSON.\n" "• If a metric is missing, say 'N/A' or 'no data for this period'.\n" "• Harare timezone is the reference for dates/times.\n" "• Keep it brief: headings + bullets; no tables unless clearly helpful.\n" "• If the user asked something specific (e.g., 'top 5 products'), answer directly from the JSON.\n" "• Otherwise, provide a short business briefing (performance, products, timing, customers, branches).\n" "• Never expose internal keys; paraphrase labels.\n" f"User Question: {json.dumps(user_question)}\n\n" "Business Data (authoritative; JSON):\n" f"{json.dumps(json_safe(briefing), ensure_ascii=False)}\n" ) resp = self.llm.invoke(prompt) text = getattr(resp, "content", None) or str(resp) return sanitize_answer(text) except Exception as e: fallback = { "note": "Narrative fallback failed; returning raw snapshot.", "error": str(e)[:200], "snapshot_keys": list(briefing.keys()) if isinstance(briefing, dict) else "N/A" } return "### Business Snapshot\n\n```\n" + json.dumps(json_safe(briefing), indent=2) + "\n```\n\n" + \ "```\n" + json.dumps(fallback, indent=2) + "\n```" # ------------------------- helpers (outside class) ------------------------- def rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]: out = [] for _, r in df.iterrows(): out.append({ "customer": str(r["customer"]), "gp": float(round(r["gp"], 2)), "revenue": float(round(r["revenue"], 2)), "orders": int(r["orders"]), "recency_days": float(round(r["recency_days"], 2)) if pd.notna(r["recency_days"]) else None, "avg_basket_value": float(round(r["avg_basket_value"], 2)) if pd.notna(r["avg_basket_value"]) else None }) return out # ----------------------------------------------------------------------------- # /chat — PandasAI first, then deterministic fallback # ----------------------------------------------------------------------------- @app.route("/chat", methods=["POST"]) @cross_origin() def bot(): request_id = str(uuid.uuid4())[:8] logger.info(f"[{request_id}] === /chat start ===") try: payload = request.get_json() or {} profile_id = str(payload.get("profile_id") or "").strip() user_question = (payload.get("user_question") or "").strip() if not profile_id or not user_question: return jsonify({"answer": "Missing 'profile_id' or 'user_question'."}) API_URL = "https://irisplustech.com/public/api/business/profile/user/get-recent-transactions-v2" try: resp = requests.post( API_URL, data={"profile_id": urllib.parse.quote_plus(profile_id)}, timeout=30 ) resp.raise_for_status() transactions = (resp.json() or {}).get("transactions") or [] except Exception as e: logger.exception(f"[{request_id}] Transaction API error: {e}") return jsonify({"answer": "I couldn’t reach the transactions service. Please try again shortly."}) if not transactions: return jsonify({"answer": "No transaction data was found for this profile."}) # Tier 1: PandasAI try: logger.info(f"[{request_id}] PandasAI attempt …") df = pd.DataFrame(transactions) pandas_agent = SmartDataframe(df, config={ "llm": llm, "response_parser": FlaskResponse, "security": "none", "save_charts_path": user_defined_path, "save_charts": False, "enable_cache": False, "conversational": True, "enable_logging": False, "custom_whitelisted_dependencies": [ "os","io","sys","chr","glob","b64decoder","collections", "builtins","datetime","timedelta","date", "pandas","numpy","math","statistics", "matplotlib","plotly","json","re","warnings" ], }) combined_prompt = f"{guardrails_preamble()}\n\n{temporal_hints(user_question)}\n\nQuestion: {user_question}" answer = pandas_agent.chat(combined_prompt) if looks_like_error(answer): logger.warning(f"[{request_id}] PandasAI invalid answer; fallback.") raise RuntimeError("PandasAI invalid answer") if isinstance(answer, pd.DataFrame): return jsonify({"answer": answer.to_html(), "meta": {"source": "pandasai"}}) if isinstance(answer, plt.Figure): buf = io.BytesIO() answer.savefig(buf, format="png") data_uri = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode('utf-8')}" return jsonify({"answer": data_uri, "meta": {"source": "pandasai"}}) return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "pandasai"}}) except Exception as e: logger.exception(f"[{request_id}] Tier 1 failed; moving to analyst.") engine = IrisReportEngine(profile_id=profile_id, transactions_data=transactions, llm_instance=llm) briefing = engine.get_business_intelligence_briefing() fallback_answer = engine.synthesize_fallback_response(briefing, user_question) return jsonify({"answer": sanitize_answer(fallback_answer), "meta": {"source": "analyst_fallback"}}) except Exception as e: logger.exception(f"[{request_id}] Critical unexpected error in /chat: {e}") return jsonify({"answer": "Something went wrong on our side. Please try again."}) # ----------------------------------------------------------------------------- # Report / Marketing / Notify — unchanged logic with safer logging # ----------------------------------------------------------------------------- @app.route("/report", methods=["POST"]) @cross_origin() def busines_report(): logger.info("=== /report ===") try: request_json = request.get_json() json_data = request_json.get("json_data") if request_json else None prompt = ( "You are Quantilytix business analyst. Analyze the following data and generate a " "comprehensive and insightful business report, including appropriate key performance " "indicators and recommendations. Use markdown formatting and tables where necessary. " "Only return the report and nothing else.\nDATA:\n" + str(json_data) ) response = model.generate_content(prompt) return jsonify(str(response.text)) except Exception as e: logger.exception("Error in /report") return jsonify({"error": "Failed to generate report.", "details": str(e)}), 500 @app.route("/marketing", methods=["POST"]) @cross_origin() def marketing(): logger.info("=== /marketing ===") try: request_json = request.get_json() json_data = request_json.get("json_data") if request_json else None prompt = ( "You are a Quantilytix Marketing Specialist. Analyze the following data and generate " "a creative, concise marketing strategy. Only return the strategy.\n" + str(json_data) ) response = model.generate_content(prompt) return jsonify(str(response.text)) except Exception as e: logger.exception("Error in /marketing") return jsonify({"error": "Failed to generate marketing strategy.", "details": str(e)}), 500 @app.route("/notify", methods=["POST"]) @cross_origin() def notifications(): logger.info("=== /notify ===") try: request_json = request.get_json() json_data = request_json.get("json_data") if request_json else None prompt = ( "You are a Quantilytix business analyst. Write a very brief analysis and marketing tips " "using this business data. Output must be suitable for a notification dashboard.\n" + str(json_data) ) response = model.generate_content(prompt) return jsonify(str(response.text)) except Exception as e: logger.exception("Error in /notify") return jsonify({"error": "Failed to generate notification content.", "details": str(e)}), 500 # ----------------------------------------------------------------------------- # ElevenLabs Voice Briefing Endpoints — history summary separated from KPI # ----------------------------------------------------------------------------- def _synthesize_history_summary(call_history: List[dict]) -> str: """Summarize past call transcripts only — never touches KPI data.""" if not call_history: return "• New caller — no prior call history." history_json = json.dumps(json_safe(call_history), indent=2) analyst_prompt = ( "You are an executive assistant preparing a pre-call briefing for a voice AI business analyst named Iris.\n" "ONLY analyze the user's past call history and identify recurring themes or concerns.\n\n" "USER'S PAST CALL HISTORY (Transcripts):\n" f"{history_json}\n\n" "- Output a few bullet points only. No fluff." ) try: response = model.generate_content(analyst_prompt) summary = (response.text or "").strip() logger.info(f"Generated conversation history summary ({len(summary)} chars).") return summary except Exception as e: logger.error(f"Failed to generate history summary: {e}") return "• Could not summarize prior calls." @app.route("/api/log-call-usage", methods=["POST"]) @cross_origin() def log_call_usage(): """Logs the transcript of a completed call to Firebase.""" logger.info("=== /api/log-call-usage ===") payload = request.get_json() or {} profile_id = payload.get("profile_id") transcript = payload.get("transcript") duration = payload.get("durationSeconds") if not profile_id or not transcript: return jsonify({"error": "Missing 'profile_id' or 'transcript'."}), 400 try: call_id = f"call_{int(time.time())}" transcript_ref = db_ref.child(f"transcripts/{profile_id}/{call_id}") transcript_data = { "transcript": transcript, "profileId": profile_id, "durationSeconds": duration, "createdAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } transcript_ref.set(json_safe(transcript_data)) logger.info(f"Stored transcript for profile '{profile_id}'.") return jsonify({"status": "success"}), 200 except Exception as e: logger.exception(f"Firebase error storing transcript for '{profile_id}': {e}") return jsonify({"error": "A server error occurred while storing the transcript."}), 500 @app.route("/api/call-briefing", methods=["POST"]) @cross_origin() def get_call_briefing(): """ Returns: - memory_summary: bullets summarizing prior call transcripts - kpi_snapshot: deterministic KPI dict for the live profile (no LLM in numbers) """ logger.info("=== /api/call-briefing ===") profile_id = str((request.get_json() or {}).get("profile_id") or "").strip() if not profile_id: return jsonify({"error": "Missing 'profile_id'."}), 400 try: # 1) Summarize call history call_history = [] try: transcripts = db_ref.child(f"transcripts/{profile_id}").get() if transcripts: call_history = list(transcripts.values()) logger.info(f"Found {len(call_history)} past transcripts for '{profile_id}'.") except Exception as e: logger.warning(f"Could not fetch transcript history for '{profile_id}': {e}") memory_summary = _synthesize_history_summary(call_history) # 2) KPI Snapshot from live transactions kpi_snapshot: Dict[str, Any] = {"Status": "Could not retrieve business data."} API_URL = "https://irisplustech.com/public/api/business/profile/user/get-recent-transactions-v2" try: resp = requests.post( API_URL, data={"profile_id": urllib.parse.quote_plus(profile_id)}, timeout=20 ) resp.raise_for_status() transactions = (resp.json() or {}).get("transactions") or [] if transactions: engine = IrisReportEngine(profile_id=profile_id, transactions_data=transactions, llm_instance=llm) kpi_snapshot = engine.get_business_intelligence_briefing() else: kpi_snapshot = {"Status": "No transaction data found for this profile."} except Exception as e: logger.warning(f"Txn fetch failed for briefing '{profile_id}': {e}") return jsonify({ "memory_summary": memory_summary, "kpi_snapshot": json_safe(kpi_snapshot) }), 200 except Exception as e: logger.exception(f"Critical error in call-briefing for '{profile_id}': {e}") return jsonify({"error": "Failed to generate call briefing."}), 500 # ----------------------------------------------------------------------------- # Entrypoint # ----------------------------------------------------------------------------- if __name__ == "__main__": # Do NOT use debug=True in production; enabling here for dev. app.run(debug=True, host="0.0.0.0", port=7860)