dash-chat-api / main.py
rairo's picture
Update main.py
71935f9 verified
# app.py — FIXED: KPI snapshot determinism + deep crunch logging (server + optional Firebase)
# - Robust ColumnResolver (no guessing by the model; no hallucinated fields)
# - Deterministic time windows (Harare tz; explicit start/end)
# - KPI engine never uses LLM for numbers (LLM is narration-only fallback)
# - JSON-safe snapshot (no pandas/Index objects)
# - Detailed DEBUG logs for each crunch stage; optional Firebase debug logs
# - Sanitized PandasAI output; hard guard on errors
# - Cleaned Firebase init; clean FlaskResponse ResponseParser
# - Safer temporal hints + guardrails
# - Fixed sanitize_answer() bug and many stray smart-quotes/copy artifacts
from __future__ import annotations
import os
import io
import re
import json
import time
import uuid
import base64
import logging
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import urllib.parse
from flask import Flask, request, jsonify
from flask_cors import CORS, cross_origin
from dotenv import load_dotenv
# LLMs
from langchain_google_genai import ChatGoogleGenerativeAI
import google.generativeai as genai
# PandasAI
from pandasai import SmartDataframe
from pandasai.responses.response_parser import ResponseParser
# Firebase
import firebase_admin
from firebase_admin import credentials, db
# -----------------------------------------------------------------------------
# Init
# -----------------------------------------------------------------------------
load_dotenv()
app = Flask(__name__)
CORS(app)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("iris-app")
# -----------------------------------------------------------------------------
# Firebase Initialization
# -----------------------------------------------------------------------------
try:
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string:
raise ValueError("FIREBASE env var is not set")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
if not firebase_db_url:
raise ValueError("Firebase_DB env var is not set")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {"databaseURL": firebase_db_url})
db_ref = db.reference()
logger.info("Firebase Admin SDK initialized.")
except Exception as e:
logger.fatal(f"FATAL: Firebase init failed: {e}")
raise
LOG_KPI_TO_FIREBASE = os.getenv("LOG_KPI_TO_FIREBASE", "0") == "1"
# -----------------------------------------------------------------------------
# Response parser for PandasAI
# -----------------------------------------------------------------------------
class FlaskResponse(ResponseParser):
def __init__(self, context):
super().__init__(context)
def format_dataframe(self, result):
try:
return result["value"].to_html()
except Exception:
return ""
def format_plot(self, result):
val = result.get("value")
if hasattr(val, "savefig"):
buf = io.BytesIO()
val.savefig(buf, format="png")
buf.seek(0)
return f"data:image/png;base64,{base64.b64encode(buf.read()).decode('utf-8')}"
if isinstance(val, str) and os.path.isfile(os.path.join(val)):
with open(os.path.join(val), "rb") as f:
return f"data:image/png;base64,{base64.b64encode(f.read()).decode('utf-8')}"
return str(val)
def format_other(self, result):
return str(result.get("value", ""))
# -----------------------------------------------------------------------------
# LLM init
# -----------------------------------------------------------------------------
logger.info("Initializing models…")
gemini_api_key = os.getenv("Gemini")
if not gemini_api_key:
raise ValueError("Gemini API key is required (env var Gemini).")
llm = ChatGoogleGenerativeAI(
api_key=gemini_api_key,
model="gemini-2.0-flash",
temperature=0.1
)
genai.configure(api_key=gemini_api_key)
generation_config = {"temperature": 0.2, "top_p": 0.95, "max_output_tokens": 5000}
model = genai.GenerativeModel(
model_name="gemini-2.0-flash-lite-001",
generation_config=generation_config,
)
logger.info("AI models initialized.")
user_defined_path = os.path.join("/exports/charts", str(uuid.uuid4()))
logger.info(f"Chart export path set to: {user_defined_path}")
# -----------------------------------------------------------------------------
# Temporal helpers + guardrails
# -----------------------------------------------------------------------------
TZ = "Africa/Harare"
def now_harare() -> pd.Timestamp:
return pd.Timestamp.now(tz=TZ)
def week_bounds_from(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
# Monday 00:00:00 to Sunday 23:59:59
monday = ts.tz_convert(TZ).normalize() - pd.Timedelta(days=ts.weekday())
sunday = monday + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59)
return monday, sunday
def next_week_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
this_mon, _ = week_bounds_from(ts)
next_mon = this_mon + pd.Timedelta(days=7)
next_sun = next_mon + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59)
return next_mon, next_sun
def last_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
first_this = ts.normalize().replace(day=1)
last_month_end = first_this - pd.Timedelta(seconds=1)
first_last = last_month_end.replace(day=1).normalize()
last_month_end = last_month_end.replace(hour=23, minute=59, second=59)
return first_last, last_month_end
def this_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
first_this = ts.normalize().replace(day=1)
if first_this.month == 12:
first_next = first_this.replace(year=first_this.year + 1, month=1)
else:
first_next = first_this.replace(month=first_this.month + 1)
last_this = first_next - pd.Timedelta(seconds=1)
return first_this, last_this
def quarter_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
q = (ts.month - 1) // 3 + 1
first_month = 3 * (q - 1) + 1
first = ts.normalize().replace(month=first_month, day=1)
if first_month == 10:
first_next = first.replace(year=first.year + 1, month=1)
else:
first_next = first.replace(month=first_month + 3)
last = first_next - pd.Timedelta(seconds=1)
return first, last
_TEMP_WINDOWS = [
("next week", lambda base: next_week_bounds(base)),
("this week", lambda base: week_bounds_from(base)),
("last week", lambda base: week_bounds_from(base - pd.Timedelta(days=7))),
("yesterday", lambda base: (
base.normalize() - pd.Timedelta(days=1),
base.normalize() - pd.Timedelta(seconds=1)
)),
("tomorrow", lambda base: (
base.normalize() + pd.Timedelta(days=1),
base.normalize() + pd.Timedelta(days=1, hours=23, minutes=59, seconds=59)
)),
("this month", lambda base: this_month_bounds(base)),
("last month", lambda base: last_month_bounds(base)),
("this quarter", lambda base: quarter_bounds(base)),
]
def extract_numeric_window(question: str):
m = re.search(r"(last|past)\s+(\d{1,3})\s+days", question.lower())
if m:
n = int(m.group(2))
end = now_harare()
start = end - pd.Timedelta(days=n)
return start, end
return None
def temporal_hints(question: str) -> str:
base = now_harare()
hints = {}
ql = question.lower()
for key, fn in _TEMP_WINDOWS:
if key in ql:
s, e = fn(base)
hints[key] = (s.strftime("%Y-%m-%d %H:%M:%S%z"), e.strftime("%Y-%m-%d %H:%M:%S%z"))
rng = extract_numeric_window(question)
if rng:
s, e = rng
hints[f"last {(e - s).days} days"] = (s.strftime("%Y-%m-%d %H:%M:%S%z"), e.strftime("%Y-%m-%d %H:%M:%S%z"))
if not hints:
return (
f"Temporal context: Today is {base.strftime('%Y-%m-%d')} ({TZ}). "
f"Week is Monday–Sunday. Use pd.Timestamp.now(tz='{TZ}') and pd.Timedelta. "
f"Avoid non-fixed rolling frequencies; use daily resample or dt.to_period('M')."
)
parts = [f"Temporal context: Today is {base.strftime('%Y-%m-%d')} ({TZ})."]
for k, (s, e) in hints.items():
parts.append(f'Interpret "{k}" as {s} to {e}.')
parts.append("Avoid non-fixed rolling frequencies (MonthBegin/MonthEnd/Week). Prefer daily resample or Period groupby.")
return " ".join(parts)
def guardrails_preamble() -> str:
return (
"Rules for any code you generate:\n"
"1) Do NOT use 'from datetime import datetime' or 'datetime.date.today()'.\n"
"2) Use pandas time APIs only: pd.Timestamp.now(tz='Africa/Harare'), pd.Timedelta, dt accessors.\n"
"3) If a Time column exists, combine Date + Time and localize to Africa/Harare.\n"
"4) Ensure numeric conversion with errors='coerce' for amount fields.\n"
"5) Avoid non-fixed rolling/resample frequencies. For monthly, use df['datetime'].dt.to_period('M').\n"
"6) Never print stack traces; return a concise answer or a plot/dataframe."
)
# -----------------------------------------------------------------------------
# Error detection & sanitization
# -----------------------------------------------------------------------------
ERROR_PATTERNS = [
"traceback", "exception", "keyerror", "nameerror", "syntaxerror",
"modulenotfounderror", "importerror", "pipeline failed", "execution failed",
"__import__", "failed with error", "attributeerror", "method_descriptor",
"unfortunately, i was not able to answer your question",
"non-fixed frequency", "monthbegin", "monthend", "week:", "weekday="
]
def _stringify(obj) -> str:
try:
if isinstance(obj, (pd.DataFrame, plt.Figure)):
return ""
if isinstance(obj, (bytes, bytearray)):
return obj.decode("utf-8", errors="ignore")
return str(obj)
except Exception:
return ""
def _extract_text_like(ans):
if isinstance(ans, dict):
if "value" in ans:
return _stringify(ans["value"])
for k in ("message", "text", "content"):
if k in ans:
return _stringify(ans[k])
return _stringify(ans)
if hasattr(ans, "value"):
try:
return _stringify(getattr(ans, "value"))
except Exception:
pass
return _stringify(ans)
def looks_like_error(ans) -> bool:
if isinstance(ans, (pd.DataFrame, plt.Figure)):
return False
s = _extract_text_like(ans).strip().lower()
if not s:
return True
if any(p in s for p in ERROR_PATTERNS):
return True
if (("file " in s and " line " in s and "error" in s) or ("valueerror:" in s)):
return True
return False
def sanitize_answer(ans) -> str:
s = _extract_text_like(ans)
# remove triple-backtick fences
s = re.sub(r"```+\w*", "", s or "")
# strip tracebacks safely
tb = "Traceback (most recent call last):"
if tb in s:
s = s.split(tb, 1)[0]
if "Unfortunately, I was not able to answer your question" in (s or ""):
s = ""
return (s or "").strip()
# -----------------------------------------------------------------------------
# KPI Engine
# -----------------------------------------------------------------------------
class ColumnResolver:
"""Map messy input columns to canonical names without guessing via LLM."""
AMOUNT_CANDIDATES = [
"Amount", "amount", "Total", "total", "Settled_Amount", "GrandTotal",
"NetAmount", "Invoice_Total", "LineTotal"
]
DATE_CANDIDATES = ["datetime", "Date", "date", "TxnDate", "CreatedAt", "created_at", "timestamp"]
TIME_CANDIDATES = ["Time", "time", "TxnTime"]
CURR_CANDIDATES = ["Currency", "currency", "Curr"]
INVOICE_CANDIDATES = ["Invoice_Number", "InvoiceNo", "invoice_number", "Receipt_Number", "Order_No", "Txn_ID", "TxnId"]
PRODUCT_CANDIDATES = ["Product", "Product_Name", "Item", "Description", "Item_Name"]
UNITS_CANDIDATES = ["Units_Sold", "Qty", "Quantity", "Units"]
UNIT_COST_CANDIDATES = ["Unit_Cost_Price", "UnitCost", "CostPrice"]
TYPE_CANDIDATES = ["Transaction_Type", "TxnType", "Type"]
TELLER_CANDIDATES = ["Teller_Username", "Teller", "Cashier", "User"]
@staticmethod
def pick_first(df: pd.DataFrame, names: List[str]) -> Optional[str]:
for n in names:
if n in df.columns:
return n
return None
@staticmethod
def pick_first_numeric(df: pd.DataFrame, names: List[str]) -> Optional[str]:
for n in names:
if n in df.columns:
s = pd.to_numeric(df[n], errors="coerce")
nonnull_ratio = s.notna().mean() if len(s) else 0
if nonnull_ratio >= 0.2: # needs at least some usable data
return n
return None
@staticmethod
def map(df: pd.DataFrame) -> Dict[str, Optional[str]]:
return {
"amount": ColumnResolver.pick_first_numeric(df, ColumnResolver.AMOUNT_CANDIDATES),
"date": ColumnResolver.pick_first(df, ColumnResolver.DATE_CANDIDATES),
"time": ColumnResolver.pick_first(df, ColumnResolver.TIME_CANDIDATES),
"currency": ColumnResolver.pick_first(df, ColumnResolver.CURR_CANDIDATES),
"invoice": ColumnResolver.pick_first(df, ColumnResolver.INVOICE_CANDIDATES),
"product": ColumnResolver.pick_first(df, ColumnResolver.PRODUCT_CANDIDATES),
"units": ColumnResolver.pick_first_numeric(df, ColumnResolver.UNITS_CANDIDATES),
"unit_cost": ColumnResolver.pick_first_numeric(df, ColumnResolver.UNIT_COST_CANDIDATES),
"txn_type": ColumnResolver.pick_first(df, ColumnResolver.TYPE_CANDIDATES),
"teller": ColumnResolver.pick_first(df, ColumnResolver.TELLER_CANDIDATES),
}
def json_safe(obj: Any) -> Any:
"""Coerce numpy/pandas scalars & indexes to plain Python."""
if isinstance(obj, (np.integer,)):
return int(obj)
if isinstance(obj, (np.floating,)):
return float(obj)
if isinstance(obj, (np.bool_,)):
return bool(obj)
if isinstance(obj, (pd.Timestamp,)):
return obj.isoformat()
if isinstance(obj, (pd.Series,)):
return obj.to_dict()
if isinstance(obj, (pd.Index,)):
return [json_safe(x) for x in obj.tolist()]
if isinstance(obj, (dict,)):
return {k: json_safe(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [json_safe(x) for x in obj]
return obj
def emit_kpi_debug(profile_id: str, stage: str, payload: Dict[str, Any]) -> None:
"""Emit deep crunch logs to server; optionally mirror to Firebase for inspection."""
try:
log_obj = {"profile_id": profile_id, "stage": stage, "payload": payload}
logger.debug("KPI_DEBUG %s", json.dumps(json_safe(log_obj)))
if LOG_KPI_TO_FIREBASE:
ts = int(time.time())
db_ref.child(f"kpi_debug/{profile_id}/{stage}_{ts}").set(json_safe(payload))
except Exception as e:
logger.warning(f"Failed to emit KPI debug logs: {e}")
class IrisReportEngine:
"""
Backwards-compatible KPI engine:
- Keeps existing snapshot sections untouched
- Adds: Basket Analysis, Product Affinity, Temporal Patterns, Customer Value, Product KPIs (expanded),
Inventory (optional), Branch Analytics (per-branch + cross-branch), Cash reconciliation (optional)
- Never uses LLM for numbers. LLM only for narration elsewhere.
"""
# ---- Canonical column names (single source of truth; no magic strings sprinkled around) ----
COL_INVOICE = "_Invoice"
COL_PRODUCT = "_Product"
COL_TELLER = "_Teller"
COL_TXNTYPE = "_TxnType"
COL_BRANCH = "_Branch"
COL_CUSTOMER = "_Customer"
COL_DT = "_datetime"
COL_AMOUNT = "_Amount"
COL_UNITS = "_Units"
COL_UNITCOST = "_UnitCost"
COL_REVENUE = "_Revenue"
COL_COGS = "_COGS"
COL_GP = "_GrossProfit"
COL_HOUR = "_Hour"
COL_DOW = "_DOW"
COL_DOWI = "_DOW_idx"
DEFAULT_PARAMS = {
"top_k": 5,
"min_revenue_for_margin_pct": 50.0,
"min_tx_for_margin_pct": 3,
"rfm_window_days": 365,
"retention_factor": 1.0,
"min_support_baskets": 5, # minimum basket count for a pair to be reported
"min_lift": 1.2,
"blocked_products": ["Purchase"], # exclude accounting placeholders from product leaderboards/affinity
"cash_variance_threshold_abs": 10.0,
"cash_variance_threshold_pct": 0.008, # 0.8%
}
def __init__(
self,
profile_id: str,
transactions_data: List[dict],
llm_instance,
stock_feed: Optional[List[Dict[str, Any]]] = None,
cash_float_feed: Optional[List[Dict[str, Any]]] = None,
params: Optional[Dict[str, Any]] = None,
):
self.profile_id = profile_id
self.llm = llm_instance
self.params = {**self.DEFAULT_PARAMS, **(params or {})}
self.raw = pd.DataFrame(transactions_data)
self.stock_feed = pd.DataFrame(stock_feed) if stock_feed else pd.DataFrame()
self.cash_float_feed = pd.DataFrame(cash_float_feed) if cash_float_feed else pd.DataFrame()
self.df = self._load_and_prepare_data(self.raw)
self.currency = self._get_primary_currency()
# ------------------------- small helpers -------------------------
@staticmethod
def _rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]:
if df is None or df.empty:
return []
out = []
for _, r in df.iterrows():
out.append({
"customer": str(r.get("customer")),
"orders": int(r.get("orders", 0)),
"revenue": float(r.get("revenue", 0.0)),
"gp": float(r.get("gp", 0.0)),
"recency_days": float(r.get("recency_days", np.nan)) if pd.notna(r.get("recency_days")) else None,
"avg_basket_value": float(r.get("avg_basket_value", np.nan)) if pd.notna(r.get("avg_basket_value")) else None,
})
return out
def _has(self, df: pd.DataFrame, col: str) -> bool:
return isinstance(df, pd.DataFrame) and col in df.columns
# ------------------------- load/prepare -------------------------
def _load_and_prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
if df is None or df.empty:
emit_kpi_debug(self.profile_id, "load", {"status": "empty_input"})
return pd.DataFrame()
mapping = ColumnResolver.map(df)
emit_kpi_debug(self.profile_id, "column_map", mapping)
# Numerics
amt_col = mapping["amount"] or ("Settled_Amount" if "Settled_Amount" in df.columns else None)
if amt_col and amt_col in df:
df[self.COL_AMOUNT] = pd.to_numeric(df[amt_col], errors="coerce")
else:
df[self.COL_AMOUNT] = pd.Series(dtype=float)
if mapping["units"] and mapping["units"] in df:
df[self.COL_UNITS] = pd.to_numeric(df[mapping["units"]], errors="coerce").fillna(0)
else:
df[self.COL_UNITS] = 0
if mapping["unit_cost"] and mapping["unit_cost"] in df:
df[self.COL_UNITCOST] = pd.to_numeric(df[mapping["unit_cost"]], errors="coerce").fillna(0.0)
else:
df[self.COL_UNITCOST] = 0.0
# Datetime
if mapping["date"] and mapping["date"] in df:
if mapping["time"] and mapping["time"] in df:
dt_series = pd.to_datetime(
df[mapping["date"]].astype(str) + " " + df[mapping["time"]].astype(str),
errors="coerce"
)
else:
dt_series = pd.to_datetime(df[mapping["date"]], errors="coerce")
else:
dt_series = pd.to_datetime(df.get("datetime"), errors="coerce")
try:
if getattr(dt_series.dt, "tz", None) is None:
dt_series = dt_series.dt.tz_localize(TZ, nonexistent="shift_forward", ambiguous="NaT")
else:
dt_series = dt_series.dt.tz_convert(TZ)
except Exception:
pass
df[self.COL_DT] = dt_series
df = df.dropna(subset=[self.COL_DT]).copy()
# Canonical dims
df[self.COL_INVOICE] = df[mapping["invoice"]] if mapping["invoice"] and mapping["invoice"] in df else None
df[self.COL_PRODUCT] = df[mapping["product"]] if mapping["product"] and mapping["product"] in df else None
df[self.COL_TELLER] = df[mapping["teller"]] if mapping["teller"] and mapping["teller"] in df else None
df[self.COL_TXNTYPE] = (df[mapping["txn_type"]].astype(str).str.lower()
if mapping["txn_type"] and mapping["txn_type"] in df
else df.get("Transaction_Type", "").astype(str).str.lower())
df[self.COL_BRANCH] = df.get("Branch")
df[self.COL_CUSTOMER] = df.get("Customer_Reference")
# Sales filter: keep explicit sales OR Transaction_Type_ID 21 OR positive amounts
txid_series = df.get("Transaction_Type_ID")
sales_mask = (
df[self.COL_TXNTYPE].isin(["sale", "sales", "invoice"]) |
(pd.Series(False, index=df.index) if txid_series is None else txid_series.isin([21])) |
(df[self.COL_AMOUNT] > 0)
)
working = df[sales_mask].copy()
# Derive measures
working[self.COL_REVENUE] = working[self.COL_AMOUNT].fillna(0.0)
working[self.COL_COGS] = (working[self.COL_UNITCOST] * working[self.COL_UNITS]).fillna(0.0)
working[self.COL_GP] = (working[self.COL_REVENUE] - working[self.COL_COGS]).fillna(0.0)
working[self.COL_HOUR] = working[self.COL_DT].dt.hour
working[self.COL_DOW] = working[self.COL_DT].dt.day_name()
working[self.COL_DOWI] = working[self.COL_DT].dt.dayofweek # 0=Mon .. 6=Sun
# Deduplicate exact duplicate sale lines
before = len(working)
dedupe_keys = ["Transaction_ID", self.COL_INVOICE, self.COL_PRODUCT, self.COL_UNITS, self.COL_AMOUNT, self.COL_DT]
existing_keys = [k for k in dedupe_keys if k in working.columns]
if existing_keys:
working = working.drop_duplicates(subset=existing_keys)
duplicates_dropped = before - len(working)
# Drop zero rows if both revenue and cost are zero
working = working[(working[self.COL_REVENUE].abs() > 0) | (working[self.COL_COGS].abs() > 0)]
emit_kpi_debug(self.profile_id, "prepared_counts", {
"raw_rows": int(len(self.raw)),
"rows_with_datetime": int(len(df)),
"sale_like_rows": int(len(working)),
"duplicates_dropped": int(duplicates_dropped),
})
self._prepared_dupes_dropped = int(duplicates_dropped)
self._non_sale_excluded = int(len(df) - len(working))
return working
def _get_primary_currency(self) -> str:
try:
mapping = ColumnResolver.map(self.raw)
if mapping["currency"] and mapping["currency"] in self.raw:
mode_series = self.raw[mapping["currency"]].dropna().astype(str)
if not mode_series.empty:
val = mode_series.mode()
if not val.empty:
return str(val.iloc[0])
except Exception:
pass
return "USD"
# ------------------------- timeframes & headline -------------------------
def _get_comparison_timeframes(self) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]:
if self.df.empty:
return self.df, self.df, {}
now = now_harare()
start_cur, end_cur = week_bounds_from(now)
start_prev = start_cur - pd.Timedelta(days=7)
end_prev = start_cur - pd.Timedelta(seconds=1)
current_df = self.df[(self.df[self.COL_DT] >= start_cur) & (self.df[self.COL_DT] <= end_cur)]
previous_df = self.df[(self.df[self.COL_DT] >= start_prev) & (self.df[self.COL_DT] <= end_prev)]
meta = {
"period_label": "This Week vs. Last Week",
"current_start": start_cur.isoformat(),
"current_end": end_cur.isoformat(),
"prev_start": start_prev.isoformat(),
"prev_end": end_prev.isoformat(),
"current_rows": int(len(current_df)),
"previous_rows": int(len(previous_df)),
}
emit_kpi_debug(self.profile_id, "timeframes", meta)
return current_df, previous_df, meta
@staticmethod
def _pct_change(cur: float, prev: float) -> str:
if prev == 0:
return "+100%" if cur > 0 else "0.0%"
return f"{((cur - prev) / prev) * 100:+.1f}%"
def _headline(self, cur_df: pd.DataFrame, prev_df: pd.DataFrame) -> Dict[str, Any]:
cur_rev = float(cur_df[self.COL_REVENUE].sum()) if not cur_df.empty else 0.0
prev_rev = float(prev_df[self.COL_REVENUE].sum()) if not prev_df.empty else 0.0
cur_gp = float(cur_df[self.COL_GP].sum()) if not cur_df.empty else 0.0
prev_gp = float(prev_df[self.COL_GP].sum()) if not prev_df.empty else 0.0
if self._has(cur_df, self.COL_INVOICE) and cur_df[self.COL_INVOICE].notna().any():
tx_now = int(cur_df[self.COL_INVOICE].nunique())
else:
tx_now = int(len(cur_df))
if self._has(prev_df, self.COL_INVOICE) and prev_df[self.COL_INVOICE].notna().any():
tx_prev = int(prev_df[self.COL_INVOICE].nunique())
else:
tx_prev = int(len(prev_df))
head = {
"total_revenue_value": round(cur_rev, 2),
"total_revenue_fmt": f"{self.currency} {cur_rev:,.2f}",
"total_revenue_change": self._pct_change(cur_rev, prev_rev),
"gross_profit_value": round(cur_gp, 2),
"gross_profit_fmt": f"{self.currency} {cur_gp:,.2f}",
"gross_profit_change": self._pct_change(cur_gp, prev_gp),
"transactions_value": tx_now,
"transactions_change": self._pct_change(tx_now, tx_prev),
}
emit_kpi_debug(self.profile_id, "headline", head)
return head
# ------------------------- core builders -------------------------
def _build_product_aggregates(self, cur_df: pd.DataFrame) -> pd.DataFrame:
if cur_df.empty:
return pd.DataFrame(columns=[
self.COL_PRODUCT,"revenue","units","cogs","gross_profit","margin_pct",
"avg_selling_price","avg_unit_cost","tx_count"
])
df = cur_df.copy()
# Exclude blocked products for leaderboards/affinity, but keep them in totals if needed
if self.params["blocked_products"]:
df = df[~df[self.COL_PRODUCT].astype(str).str.strip().isin(self.params["blocked_products"])]
# Tx count via invoice nunique if available
if self._has(df, self.COL_INVOICE) and df[self.COL_INVOICE].notna().any():
g = df.groupby(self.COL_PRODUCT, dropna=False).agg(
revenue=(self.COL_REVENUE,"sum"),
units=(self.COL_UNITS,"sum"),
cogs=(self.COL_COGS,"sum"),
gp=(self.COL_GP,"sum"),
tx=(self.COL_INVOICE,"nunique")
)
else:
g = df.groupby(self.COL_PRODUCT, dropna=False).agg(
revenue=(self.COL_REVENUE,"sum"),
units=(self.COL_UNITS,"sum"),
cogs=(self.COL_COGS,"sum"),
gp=(self.COL_GP,"sum"),
tx=(self.COL_PRODUCT,"size")
)
g = g.rename(columns={"gp":"gross_profit", "tx":"tx_count"}).reset_index()
# Derived ratios
g["margin_pct"] = np.where(g["revenue"] > 0, g["gross_profit"] / g["revenue"], np.nan)
g["avg_selling_price"] = np.where(g["units"] > 0, g["revenue"] / g["units"], np.nan)
g["avg_unit_cost"] = np.where(g["units"] > 0, g["cogs"] / g["units"], np.nan)
return g
def _build_basket_table(self, cur_df: pd.DataFrame) -> pd.DataFrame:
if cur_df.empty or not self._has(cur_df, self.COL_INVOICE):
return pd.DataFrame(columns=[self.COL_INVOICE,"basket_revenue","basket_gp","basket_items","_datetime_max"])
b = cur_df.groupby(self.COL_INVOICE, dropna=False).agg(
basket_revenue=(self.COL_REVENUE,"sum"),
basket_gp=(self.COL_GP,"sum"),
basket_items=(self.COL_UNITS,"sum"),
_datetime_max=(self.COL_DT,"max"),
).reset_index()
return b
def _basket_kpis(self, basket_df: pd.DataFrame) -> Dict[str, Any]:
if basket_df is None or basket_df.empty:
return {
"avg_items_per_basket": "N/A",
"avg_gross_profit_per_basket": "N/A",
"median_basket_value": "N/A",
"basket_size_distribution": {},
"low_sample": True
}
avg_items = float(basket_df["basket_items"].mean())
avg_gp = float(basket_df["basket_gp"].mean())
median_value = float(basket_df["basket_revenue"].median())
sizes = basket_df["basket_items"].fillna(0)
bins = {
"1": int((sizes == 1).sum()),
"2-3": int(((sizes >= 2) & (sizes <= 3)).sum()),
"4-5": int(((sizes >= 4) & (sizes <= 5)).sum()),
"6_plus": int((sizes >= 6).sum()),
}
return {
"avg_items_per_basket": round(avg_items, 2),
"avg_gross_profit_per_basket": round(avg_gp, 2),
"median_basket_value": round(median_value, 2),
"basket_size_distribution": bins
}
def _affinity_pairs(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
# Build unique product sets per invoice, count pairs
if cur_df.empty or basket_df.empty or not self._has(cur_df, self.COL_PRODUCT) or not self._has(cur_df, self.COL_INVOICE):
return {"params": self._affinity_params(), "top_pairs": []}
tmp = cur_df[[self.COL_INVOICE, self.COL_PRODUCT]].dropna()
if tmp.empty:
return {"params": self._affinity_params(), "top_pairs": []}
blocked = set(self.params.get("blocked_products", []) or [])
tmp = tmp[~tmp[self.COL_PRODUCT].astype(str).str.strip().isin(blocked)]
if tmp.empty:
return {"params": self._affinity_params(), "top_pairs": []}
products_per_invoice = tmp.groupby(self.COL_INVOICE)[self.COL_PRODUCT].agg(lambda s: sorted(set(map(str, s)))).reset_index()
total_baskets = int(len(products_per_invoice))
if total_baskets == 0:
return {"params": self._affinity_params(), "top_pairs": []}
from collections import Counter
single_counter = Counter()
for prods in products_per_invoice[self.COL_PRODUCT]:
single_counter.update(prods)
pair_counter = Counter()
for prods in products_per_invoice[self.COL_PRODUCT]:
if len(prods) < 2:
continue
for i in range(len(prods)):
for j in range(i+1, len(prods)):
a, b = prods[i], prods[j]
pair = (a, b) if a <= b else (b, a)
pair_counter[pair] += 1
min_support_baskets = int(self.params["min_support_baskets"])
min_lift = float(self.params["min_lift"])
top_k = int(self.params["top_k"])
rows = []
# Average pair revenue across baskets containing both (optional; approximate)
inv_with_products = cur_df.groupby(self.COL_INVOICE)[self.COL_PRODUCT].apply(lambda s: set(map(str, s.dropna())))
rev_by_inv = cur_df.groupby(self.COL_INVOICE)[self.COL_REVENUE].sum()
for (a, b), ab_count in pair_counter.items():
if ab_count < min_support_baskets:
continue
support_a = single_counter.get(a, 0) / total_baskets
support_b = single_counter.get(b, 0) / total_baskets
support_ab = ab_count / total_baskets
if support_a == 0 or support_b == 0:
continue
confidence = support_ab / support_a
lift = support_ab / (support_a * support_b) if (support_a * support_b) > 0 else np.nan
if not np.isfinite(lift) or lift < min_lift:
continue
inv_mask = inv_with_products.apply(lambda s: (a in s) and (b in s))
pair_invoices = inv_mask[inv_mask].index
avg_pair_revenue = float(rev_by_inv.loc[pair_invoices].mean()) if len(pair_invoices) else np.nan
rows.append({
"a": a, "b": b,
"support_ab": round(float(support_ab), 6),
"confidence_a_to_b": round(float(confidence), 6),
"lift": round(float(lift), 6),
"pair_basket_count": int(ab_count),
"avg_pair_revenue": round(avg_pair_revenue, 2) if np.isfinite(avg_pair_revenue) else None,
})
rows.sort(key=lambda r: (r["lift"], r["pair_basket_count"], r["support_ab"]), reverse=True)
emit_kpi_debug(self.profile_id, "affinity_pairs_counts", {
"total_baskets": total_baskets, "pairs_after_filters": len(rows)
})
return {"params": self._affinity_params(), "top_pairs": rows[:top_k]}
def _affinity_params(self) -> Dict[str, Any]:
return {
"min_support_baskets": int(self.params["min_support_baskets"]),
"min_lift": float(self.params["min_lift"]),
"top_k": int(self.params["top_k"]),
}
def _temporal_patterns(self, cur_df: pd.DataFrame) -> Dict[str, Any]:
if cur_df.empty:
return {
"best_hour_by_profit": None,
"best_day_by_profit": None,
"hourly_series": [],
"dow_series": [],
"profit_heatmap_7x24": []
}
gh = cur_df.groupby(self.COL_HOUR, dropna=False).agg(
revenue=(self.COL_REVENUE,"sum"),
gross_profit=(self.COL_GP,"sum")
).reset_index()
best_hour_idx = int(gh.loc[gh["gross_profit"].idxmax(), self.COL_HOUR]) if not gh.empty else None
best_hour_gp = float(gh["gross_profit"].max()) if not gh.empty else None
gd = cur_df.groupby(self.COL_DOW, dropna=False).agg(
revenue=(self.COL_REVENUE,"sum"),
gross_profit=(self.COL_GP,"sum")
).reset_index()
order_map = cur_df.groupby(self.COL_DOW)[self.COL_DOWI].max().to_dict()
gd["__ord"] = gd[self.COL_DOW].map(order_map)
gd = gd.sort_values("__ord", kind="stable")
best_day_row = gd.loc[gd["gross_profit"].idxmax()] if not gd.empty else None
best_day = {"day": str(best_day_row[self.COL_DOW]), "gross_profit": float(best_day_row["gross_profit"])} if best_day_row is not None else None
m = cur_df.groupby([self.COL_DOWI, self.COL_HOUR], dropna=False)[self.COL_GP].sum().unstack(fill_value=0)
m = m.reindex(index=range(0,7), columns=range(0,24), fill_value=0)
heatmap = [[float(x) for x in row] for row in m.values.tolist()]
hourly_series = gh.rename(columns={self.COL_HOUR:"hour"}).to_dict(orient="records")
dow_series = gd[[self.COL_DOW,"revenue","gross_profit"]].rename(columns={self.COL_DOW:"day"}).to_dict(orient="records")
return {
"best_hour_by_profit": {"hour": best_hour_idx, "gross_profit": round(best_hour_gp, 2)} if best_hour_idx is not None else None,
"best_day_by_profit": best_day,
"hourly_series": [{"hour": int(r["hour"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in hourly_series],
"dow_series": [{"day": str(r["day"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in dow_series],
"profit_heatmap_7x24": heatmap
}
def _customer_value(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]:
if cur_df.empty or not self._has(cur_df, self.COL_CUSTOMER):
return {
"params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
"leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []},
"rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None}
}
df = cur_df.copy()
last_date = df.groupby(self.COL_CUSTOMER)[self.COL_DT].max()
if self._has(df, self.COL_INVOICE):
orders = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].nunique()
else:
orders = df.groupby(self.COL_CUSTOMER).size()
revenue = df.groupby(self.COL_CUSTOMER)[self.COL_REVENUE].sum()
gp = df.groupby(self.COL_CUSTOMER)[self.COL_GP].sum()
# Avg basket value per customer (from their invoices)
if not basket_df.empty and self._has(df, self.COL_INVOICE):
inv_to_rev = basket_df.set_index(self.COL_INVOICE)["basket_revenue"]
cust_invoices = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].agg(lambda x: sorted(set(x)))
avg_basket_val = {}
for cust, invs in cust_invoices.items():
vals = inv_to_rev.reindex(invs).dropna()
avg_basket_val[cust] = float(vals.mean()) if len(vals) else np.nan
avg_basket = pd.Series(avg_basket_val)
else:
avg_basket = pd.Series(dtype=float)
base = now_harare().normalize()
recency_days = (base - last_date).dt.total_seconds() / (60*60*24)
rfm = pd.DataFrame({
"customer": last_date.index.astype(str),
"last_date": last_date.values,
"orders": orders.reindex(last_date.index).fillna(0).astype(int).values,
"revenue": revenue.reindex(last_date.index).fillna(0.0).values,
"gp": gp.reindex(last_date.index).fillna(0.0).values,
"recency_days": recency_days.values,
"avg_basket_value": avg_basket.reindex(last_date.index).values
}).fillna({"avg_basket_value": np.nan})
vip = rfm.sort_values(["gp","orders","revenue"], ascending=[False, False, False]).head(20)
if len(rfm):
gp_q3 = rfm["gp"].quantile(0.75)
at_risk = rfm[(rfm["gp"] >= gp_q3) & (rfm["recency_days"] > 30)].sort_values(["gp","recency_days"], ascending=[False, False]).head(20)
else:
at_risk = rfm.head(0)
new_customers = rfm[(rfm["orders"] == 1) & (rfm["recency_days"] <= 7)].sort_values("gp", ascending=False).head(20)
out = {
"params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
"leaderboards": {
"top_customers_by_gp": self._rfm_to_list(vip),
"at_risk": self._rfm_to_list(at_risk),
"new_customers": self._rfm_to_list(new_customers)
},
"rfm_summary": {
"unique_customers": int(rfm["customer"].nunique()),
"median_recency_days": float(rfm["recency_days"].median()) if len(rfm) else None,
"median_orders": float(rfm["orders"].median()) if len(rfm) else None,
"median_gp": float(rfm["gp"].median()) if len(rfm) else None
}
}
emit_kpi_debug(self.profile_id, "rfm_done", {"customers": int(rfm["customer"].nunique())})
return json_safe(out)
# ------------------------- inventory & cash -------------------------
def _inventory_block(self, cur_df: pd.DataFrame, product_agg: pd.DataFrame, current_bounds: Tuple[pd.Timestamp, pd.Timestamp]) -> Dict[str, Any]:
if self.stock_feed.empty:
return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
start_cur, end_cur = current_bounds
days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
pa = (product_agg or pd.DataFrame()).copy()
if pa.empty:
return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
pa["units_per_day"] = pa["units"] / days
sf = self.stock_feed.copy()
sf["product_key"] = sf.get("product", sf.get("Product", "")).astype(str).str.strip()
pa["product_key"] = pa[self.COL_PRODUCT].astype(str).str.strip()
merged = pa.merge(sf, on="product_key", how="right", suffixes=("", "_stock"))
merged["units_per_day"] = merged["units_per_day"].fillna(0.0)
merged["stock_on_hand"] = pd.to_numeric(merged.get("stock_on_hand", np.nan), errors="coerce")
merged["reorder_point"] = pd.to_numeric(merged.get("reorder_point", np.nan), errors="coerce")
merged["lead_time_days"] = pd.to_numeric(merged.get("lead_time_days", np.nan), errors="coerce")
merged["days_of_cover"] = np.where(merged["units_per_day"] > 0, merged["stock_on_hand"] / merged["units_per_day"], np.inf)
def status_row(r):
if pd.isna(r.get("stock_on_hand")):
return "unknown"
if (r["stock_on_hand"] or 0) <= 0:
return "stockout"
if pd.notna(r.get("reorder_point")) and r["stock_on_hand"] <= r["reorder_point"]:
return "low"
if np.isfinite(r["days_of_cover"]) and pd.notna(r.get("lead_time_days")) and r["days_of_cover"] < r["lead_time_days"]:
return "stockout_risk"
if r["units_per_day"] == 0 and (r["stock_on_hand"] or 0) > 0:
return "dead_stock"
return "ok"
merged["status"] = merged.apply(status_row, axis=1)
products_out, low_stock, stockout_risk, dead_stock = [], [], [], []
for _, r in merged.iterrows():
rec = {
"product": str(r.get(self.COL_PRODUCT) or r.get("product_key")),
"stock_on_hand": float(r["stock_on_hand"]) if pd.notna(r["stock_on_hand"]) else None,
"reorder_point": float(r["reorder_point"]) if pd.notna(r["reorder_point"]) else None,
"lead_time_days": float(r["lead_time_days"]) if pd.notna(r["lead_time_days"]) else None,
"days_of_cover": float(r["days_of_cover"]) if np.isfinite(r["days_of_cover"]) else None,
"daily_sales_velocity": float(r["units_per_day"]),
"status": str(r["status"])
}
products_out.append(rec)
if rec["status"] == "low":
low_stock.append(rec["product"])
elif rec["status"] == "stockout_risk":
stockout_risk.append(rec["product"])
elif rec["status"] == "dead_stock":
dead_stock.append(rec["product"])
return {
"stock_snapshot_asof": now_harare().isoformat(),
"products": products_out,
"alerts": {
"low_stock": sorted(set(low_stock)),
"stockout_risk": sorted(set(stockout_risk)),
"dead_stock": sorted(set(dead_stock))
}
}
def _cash_recon_block(self, cur_df: pd.DataFrame) -> Dict[str, Any]:
if self.cash_float_feed.empty:
return {"status": "no_cash_data"}
cf = self.cash_float_feed.copy()
out_days = []
high_var_days = 0
# Compute cash sales per branch/date from cur_df
if cur_df.empty:
cash_sales = pd.DataFrame(columns=["branch","date","cash_sales"])
else:
df = cur_df.copy()
df["date"] = df[self.COL_DT].dt.strftime("%Y-%m-%d")
df["is_cash"] = (df.get("Money_Type","").astype(str).str.lower() == "cash")
cash_sales = df[df["is_cash"]].groupby([self.COL_BRANCH,"date"])[self.COL_REVENUE].sum().reset_index()
cash_sales = cash_sales.rename(columns={self.COL_BRANCH:"branch", self.COL_REVENUE:"cash_sales"})
cf["date"] = cf["date"].astype(str).str[:10]
merged = cf.merge(cash_sales, on=["branch","date"], how="left")
merged["cash_sales"] = merged["cash_sales"].fillna(0.0)
for _, r in merged.iterrows():
opening = float(r.get("opening_float") or 0.0)
closing = float(r.get("closing_float") or 0.0)
drops = float(r.get("drops") or 0.0)
petty = float(r.get("petty_cash") or 0.0)
declared = float(r.get("declared_cash") or 0.0)
cash_sales_val = float(r.get("cash_sales") or 0.0)
expected = opening + cash_sales_val - drops - petty - closing
variance = declared - expected
variance_pct = (variance / cash_sales_val) if cash_sales_val > 0 else 0.0
flag = (abs(variance) >= float(self.params["cash_variance_threshold_abs"])) or \
(abs(variance_pct) >= float(self.params["cash_variance_threshold_pct"]))
if flag:
high_var_days += 1
out_days.append({
"branch": str(r["branch"]),
"date": str(r["date"]),
"cash_sales": round(cash_sales_val, 2),
"declared_cash": round(declared, 2),
"opening_float": round(opening, 2),
"closing_float": round(closing, 2),
"drops": round(drops, 2),
"petty_cash": round(petty, 2),
"expected_cash": round(expected, 2),
"variance": round(variance, 2),
"variance_pct": round(variance_pct, 4),
"flag": bool(flag),
})
return {"days": out_days, "flags": {"high_variance_days": int(high_var_days)}}
# ------------------------- branch analytics -------------------------
def _per_branch_blocks(self, cur_df: pd.DataFrame, previous_df: pd.DataFrame, current_bounds: Tuple[pd.Timestamp,pd.Timestamp]) -> Dict[str, Any]:
if cur_df.empty or not self._has(cur_df, self.COL_BRANCH):
return {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}}
per_branch = {}
branches = sorted(map(str, cur_df[self.COL_BRANCH].dropna().unique().tolist()))
start_cur, end_cur = current_bounds
days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
branch_summary_rows = []
for br in branches:
try:
d = cur_df[cur_df[self.COL_BRANCH] == br]
if d.empty:
continue
revenue = float(d[self.COL_REVENUE].sum())
cogs = float(d[self.COL_COGS].sum())
gp = float(d[self.COL_GP].sum())
margin_pct = (gp / revenue) if revenue > 0 else None
tx = int(d[self.COL_INVOICE].nunique()) if self._has(d, self.COL_INVOICE) and d[self.COL_INVOICE].notna().any() else int(len(d))
items = float(d[self.COL_UNITS].sum())
basket_df = self._build_basket_table(d)
basket_kpis = self._basket_kpis(basket_df)
temporal = self._temporal_patterns(d)
pagg = self._build_product_aggregates(d)
if not pagg.empty:
pagg["units_per_day"] = pagg["units"] / days
product_lb = self._product_leaderboards(pagg)
else:
product_lb = self._empty_product_leaderboards()
affinity = self._affinity_pairs(d, basket_df)
customers = self._customer_value(d, basket_df)
cash_recon = self._cash_recon_block(d)
per_branch[br] = {
"kpis": {
"revenue": round(revenue, 2),
"cogs": round(cogs, 2),
"gross_profit": round(gp, 2),
"gp_margin_pct": float(round(margin_pct, 4)) if margin_pct is not None else None,
"transactions": tx,
"items_sold": round(items, 2),
"avg_basket_value": basket_kpis.get("median_basket_value"),
"avg_items_per_basket": basket_kpis.get("avg_items_per_basket"),
"avg_gp_per_basket": basket_kpis.get("avg_gross_profit_per_basket"),
},
"temporal": temporal,
"products": product_lb,
"affinity": affinity,
"customer_value": customers,
"cash_recon": cash_recon,
"data_quality": {
"duplicates_dropped": self._prepared_dupes_dropped,
"non_sale_rows_excluded": self._non_sale_excluded,
"currency_mixed": False
}
}
branch_summary_rows.append({"branch": br, "revenue": revenue, "gp": gp, "gp_margin_pct": margin_pct or 0.0})
except Exception as e:
emit_kpi_debug(self.profile_id, "branch_block_error", {"branch": br, "error": str(e)})
cross = {}
if branch_summary_rows:
try:
bs = pd.DataFrame(branch_summary_rows)
cross["rankings"] = {
"by_revenue": bs.sort_values("revenue", ascending=False)[["branch","revenue"]].to_dict(orient="records"),
"by_gp_margin_pct": bs.sort_values("gp_margin_pct", ascending=False)[["branch","gp_margin_pct"]].to_dict(orient="records"),
}
cross["spread"] = {
"gp_margin_pct_max": float(bs["gp_margin_pct"].max()) if len(bs) else None,
"gp_margin_pct_min": float(bs["gp_margin_pct"].min()) if len(bs) else None,
"gap_pct_points": float((bs["gp_margin_pct"].max() - bs["gp_margin_pct"].min())) if len(bs) else None,
}
tot_rev = float(bs["revenue"].sum())
shares, hhi = [], 0.0
for _, r in bs.iterrows():
sh = (r["revenue"] / tot_rev) if tot_rev > 0 else 0.0
shares.append({"branch": r["branch"], "share": float(round(sh, 6))})
hhi += sh*sh
cross["concentration"] = {"share_by_branch": shares, "hhi_revenue": float(round(hhi, 6))}
if not previous_df.empty and self._has(previous_df, self.COL_BRANCH):
prev_g = previous_df.groupby(self.COL_BRANCH).agg(
revenue=(self.COL_REVENUE,"sum"),
gp=(self.COL_GP,"sum")
).reset_index().rename(columns={self.COL_BRANCH:"branch"})
cur_g = pd.DataFrame(branch_summary_rows)
m = cur_g.merge(prev_g, on="branch", suffixes=("_cur","_prev"), how="left").fillna(0.0)
wow_rows = []
for _, r in m.iterrows():
wow_rows.append({
"branch": r["branch"],
"revenue_wow": float(((r["revenue_cur"] - r["revenue_prev"]) / r["revenue_prev"])*100) if r["revenue_prev"]>0 else (100.0 if r["revenue_cur"]>0 else 0.0),
"gp_wow": float(((r["gp_cur"] - r["gp_prev"]) / r["gp_prev"])*100) if r["gp_prev"]>0 else (100.0 if r["gp_cur"]>0 else 0.0),
"avg_basket_wow": None
})
cross["trend_wow"] = wow_rows
except Exception as e:
emit_kpi_debug(self.profile_id, "branch_cross_error", {"error": str(e)})
return {"params": self._branch_params(), "per_branch": per_branch, "cross_branch": cross}
def _branch_params(self) -> Dict[str, Any]:
return {
"top_k": int(self.params["top_k"]),
"min_support_baskets": int(self.params["min_support_baskets"]),
"min_lift": float(self.params["min_lift"]),
"cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]),
"cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]),
}
# ------------------------- product leaderboards & concentration -------------------------
def _product_leaderboards(self, g: pd.DataFrame) -> Dict[str, Any]:
top_k = int(self.params["top_k"])
g_marginpct = g.copy()
g_marginpct = g_marginpct[
(g_marginpct["revenue"] >= float(self.params["min_revenue_for_margin_pct"])) &
(g_marginpct["tx_count"] >= int(self.params["min_tx_for_margin_pct"]))
]
def top(df, col, asc=False):
if df.empty:
return []
d = df.sort_values(col, ascending=asc).head(top_k)
return [
{
"product": str(r[self.COL_PRODUCT]),
"revenue": round(float(r["revenue"]), 2),
"units": float(r["units"]),
"gross_profit": round(float(r["gross_profit"]), 2),
"margin_pct": float(round(r["margin_pct"], 4)) if pd.notna(r["margin_pct"]) else None,
"tx_count": int(r["tx_count"]),
"avg_selling_price": float(round(r["avg_selling_price"], 4)) if pd.notna(r["avg_selling_price"]) else None,
"avg_unit_cost": float(round(r["avg_unit_cost"], 4)) if pd.notna(r["avg_unit_cost"]) else None,
"units_per_day": float(round(r.get("units_per_day", np.nan), 4)) if pd.notna(r.get("units_per_day", np.nan)) else None,
} for _, r in d.iterrows()
]
return {
"top_by_revenue": top(g, "revenue", asc=False),
"top_by_units": top(g, "units", asc=False),
"top_by_margin_value": top(g, "gross_profit", asc=False),
"top_by_margin_pct": top(g_marginpct, "margin_pct", asc=False),
"bottom_by_revenue": top(g, "revenue", asc=True),
"loss_makers": top(g[g["gross_profit"] < 0], "gross_profit", asc=True),
"by_velocity": top(g.assign(units_per_day=g.get("units_per_day", np.nan)), "units_per_day", asc=False),
"by_gp_per_unit": top(g.assign(gp_per_unit=np.where(g["units"]>0, g["gross_profit"]/g["units"], np.nan)), "gp_per_unit", asc=False),
}
def _empty_product_leaderboards(self) -> Dict[str, Any]:
return {
"top_by_revenue": [],
"top_by_units": [],
"top_by_margin_value": [],
"top_by_margin_pct": [],
"bottom_by_revenue": [],
"loss_makers": [],
"by_velocity": [],
"by_gp_per_unit": [],
}
def _concentration_block(self, g: pd.DataFrame) -> Dict[str, Any]:
if g.empty:
return {
"revenue_share_top5": 0.0,
"units_share_top5": 0.0,
"revenue_pareto_top20pct_share": 0.0,
"gini_revenue": 0.0
}
total_rev = float(g["revenue"].sum())
total_units = float(g["units"].sum())
rev_sorted = g.sort_values("revenue", ascending=False)["revenue"].values
units_sorted = g.sort_values("units", ascending=False)["units"].values
share_top5_rev = (rev_sorted[:5].sum() / total_rev) if total_rev > 0 else 0.0
share_top5_units = (units_sorted[:5].sum() / total_units) if total_units > 0 else 0.0
n = len(rev_sorted)
if n == 0:
pareto = 0.0
else:
k = max(1, int(np.ceil(0.2 * n)))
pareto = rev_sorted[:k].sum() / total_rev if total_rev > 0 else 0.0
if total_rev <= 0 or n == 0:
gini = 0.0
else:
x = np.sort(rev_sorted) # ascending
cum = np.cumsum(x)
gini = 1.0 - 2.0 * np.sum(cum) / (n * np.sum(x)) + 1.0 / n
return {
"revenue_share_top5": float(round(share_top5_rev, 6)),
"units_share_top5": float(round(share_top5_units, 6)),
"revenue_pareto_top20pct_share": float(round(pareto, 6)),
"gini_revenue": float(round(gini, 6))
}
# ------------------------- public API -------------------------
def get_business_intelligence_briefing(self) -> Dict[str, Any]:
if self.df.empty:
emit_kpi_debug(self.profile_id, "briefing", {"status": "no_data"})
return {"Status": "No sales data available to generate a briefing."}
current_df, previous_df, tfmeta = self._get_comparison_timeframes()
if current_df.empty:
emit_kpi_debug(self.profile_id, "briefing", {"status": "no_current_period_data", **tfmeta})
return {"Status": f"No sales data for the current period ({tfmeta.get('period_label', 'N/A')}).", "meta": tfmeta}
snapshot = {}
section_errors = {}
# Headline
try:
headline = self._headline(current_df, previous_df)
snapshot["Summary Period"] = tfmeta.get("period_label", "This Week vs. Last Week")
snapshot["Performance Snapshot (vs. Prior Period)"] = {
"Total Revenue": f"{headline['total_revenue_fmt']} ({headline['total_revenue_change']})",
"Gross Profit": f"{headline['gross_profit_fmt']} ({headline['gross_profit_change']})",
"Transactions": f"{headline['transactions_value']} ({headline['transactions_change']})",
}
except Exception as e:
section_errors["headline"] = str(e)
# Basket & affinity
try:
basket_df = self._build_basket_table(current_df)
snapshot["Basket Analysis"] = self._basket_kpis(basket_df)
except Exception as e:
section_errors["basket"] = str(e)
snapshot["Basket Analysis"] = {"avg_items_per_basket": "N/A", "avg_gross_profit_per_basket": "N/A", "median_basket_value": "N/A", "basket_size_distribution": {}, "low_sample": True}
try:
if 'basket_df' in locals():
snapshot["Product Affinity"] = self._affinity_pairs(current_df, basket_df)
else:
snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []}
except Exception as e:
section_errors["affinity"] = str(e)
snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []}
# Temporal
try:
snapshot["Temporal Patterns"] = self._temporal_patterns(current_df)
except Exception as e:
section_errors["temporal"] = str(e)
snapshot["Temporal Patterns"] = {"best_hour_by_profit": None, "best_day_by_profit": None, "hourly_series": [], "dow_series": [], "profit_heatmap_7x24": []}
# Product aggregates + leaderboards + concentration
try:
start_cur = pd.Timestamp(tfmeta["current_start"])
end_cur = pd.Timestamp(tfmeta["current_end"])
days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0)
g_products = self._build_product_aggregates(current_df)
if not g_products.empty:
g_products["units_per_day"] = g_products["units"] / days
product_lb = self._product_leaderboards(g_products)
concentration = self._concentration_block(g_products)
else:
product_lb = self._empty_product_leaderboards()
concentration = self._concentration_block(pd.DataFrame(columns=["revenue","units"]))
snapshot["Product KPIs"] = {"leaderboards": product_lb, "concentration": concentration}
except Exception as e:
section_errors["products"] = str(e)
snapshot["Product KPIs"] = {"leaderboards": self._empty_product_leaderboards(), "concentration": self._concentration_block(pd.DataFrame(columns=["revenue","units"]))}
# Customer value (RFM)
try:
# basket_df may or may not exist:
bdf = locals().get("basket_df", pd.DataFrame())
snapshot["Customer Value"] = self._customer_value(current_df, bdf)
except Exception as e:
section_errors["customer_value"] = str(e)
snapshot["Customer Value"] = {
"params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20},
"leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []},
"rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None}
}
# Inventory (optional)
try:
g_products_for_inv = locals().get("g_products", pd.DataFrame())
snapshot["Inventory"] = self._inventory_block(current_df, g_products_for_inv, (start_cur, end_cur))
except Exception as e:
section_errors["inventory"] = str(e)
snapshot["Inventory"] = {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}}
# Branch analytics
try:
snapshot["Branch Analytics"] = self._per_branch_blocks(current_df, previous_df, (start_cur, end_cur))
except Exception as e:
section_errors["branch"] = str(e)
snapshot["Branch Analytics"] = {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}}
# Meta
snapshot["meta"] = {
"timeframes": tfmeta,
"kpi_params": {
"top_k": int(self.params["top_k"]),
"min_revenue_for_margin_pct": float(self.params["min_revenue_for_margin_pct"]),
"min_tx_for_margin_pct": int(self.params["min_tx_for_margin_pct"]),
"rfm_window_days": int(self.params["rfm_window_days"]),
"retention_factor": float(self.params["retention_factor"]),
"min_support_baskets": int(self.params["min_support_baskets"]),
"min_lift": float(self.params["min_lift"]),
"blocked_products": list(self.params["blocked_products"]),
"cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]),
"cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]),
},
"row_counts": {
"input": int(len(self.raw)),
"prepared": int(len(self.df)),
"current_period": int(len(current_df)),
"previous_period": int(len(previous_df)),
},
"notes": [
"Non-sales transaction types excluded (e.g., Transaction_Type_ID != 21).",
f"Duplicates dropped: {getattr(self, '_prepared_dupes_dropped', 0)}",
],
"section_errors": section_errors, # surfaced to the client for your debug panel
}
emit_kpi_debug(self.profile_id, "briefing_done", snapshot["meta"])
return json_safe(snapshot)
def synthesize_fallback_response(self, briefing: dict, user_question: str) -> str:
"""
LLM is narration-only. Do NOT invent or recompute numbers here.
Use the already-deterministic 'briefing' dict as the single source of truth.
Safe for PandasAI exception fallback.
"""
try:
prompt = (
"You are Iris, a concise business analyst.\n"
"IMPORTANT RULES:\n"
"• DO NOT invent numbers. ONLY use values present in the Business Data JSON.\n"
"• If a metric is missing, say 'N/A' or 'no data for this period'.\n"
"• Harare timezone is the reference for dates/times.\n"
"• Keep it brief: headings + bullets; no tables unless clearly helpful.\n"
"• If the user asked something specific (e.g., 'top 5 products'), answer directly from the JSON.\n"
"• Otherwise, provide a short business briefing (performance, products, timing, customers, branches).\n"
"• Never expose internal keys; paraphrase labels.\n"
f"User Question: {json.dumps(user_question)}\n\n"
"Business Data (authoritative; JSON):\n"
f"{json.dumps(json_safe(briefing), ensure_ascii=False)}\n"
)
resp = self.llm.invoke(prompt)
text = getattr(resp, "content", None) or str(resp)
return sanitize_answer(text)
except Exception as e:
fallback = {
"note": "Narrative fallback failed; returning raw snapshot.",
"error": str(e)[:200],
"snapshot_keys": list(briefing.keys()) if isinstance(briefing, dict) else "N/A"
}
return "### Business Snapshot\n\n```\n" + json.dumps(json_safe(briefing), indent=2) + "\n```\n\n" + \
"```\n" + json.dumps(fallback, indent=2) + "\n```"
# ------------------------- helpers (outside class) -------------------------
def rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]:
out = []
for _, r in df.iterrows():
out.append({
"customer": str(r["customer"]),
"gp": float(round(r["gp"], 2)),
"revenue": float(round(r["revenue"], 2)),
"orders": int(r["orders"]),
"recency_days": float(round(r["recency_days"], 2)) if pd.notna(r["recency_days"]) else None,
"avg_basket_value": float(round(r["avg_basket_value"], 2)) if pd.notna(r["avg_basket_value"]) else None
})
return out
# -----------------------------------------------------------------------------
# /chat — PandasAI first, then deterministic fallback
# -----------------------------------------------------------------------------
@app.route("/chat", methods=["POST"])
@cross_origin()
def bot():
request_id = str(uuid.uuid4())[:8]
logger.info(f"[{request_id}] === /chat start ===")
try:
payload = request.get_json() or {}
profile_id = str(payload.get("profile_id") or "").strip()
user_question = (payload.get("user_question") or "").strip()
if not profile_id or not user_question:
return jsonify({"answer": "Missing 'profile_id' or 'user_question'."})
API_URL = "https://irisplustech.com/public/api/business/profile/user/get-recent-transactions-v2"
try:
resp = requests.post(
API_URL,
data={"profile_id": urllib.parse.quote_plus(profile_id)},
timeout=30
)
resp.raise_for_status()
transactions = (resp.json() or {}).get("transactions") or []
except Exception as e:
logger.exception(f"[{request_id}] Transaction API error: {e}")
return jsonify({"answer": "I couldn’t reach the transactions service. Please try again shortly."})
if not transactions:
return jsonify({"answer": "No transaction data was found for this profile."})
# Tier 1: PandasAI
try:
logger.info(f"[{request_id}] PandasAI attempt …")
df = pd.DataFrame(transactions)
pandas_agent = SmartDataframe(df, config={
"llm": llm,
"response_parser": FlaskResponse,
"security": "none",
"save_charts_path": user_defined_path,
"save_charts": False,
"enable_cache": False,
"conversational": True,
"enable_logging": False,
"custom_whitelisted_dependencies": [
"os","io","sys","chr","glob","b64decoder","collections",
"builtins","datetime","timedelta","date",
"pandas","numpy","math","statistics",
"matplotlib","plotly","json","re","warnings"
],
})
combined_prompt = f"{guardrails_preamble()}\n\n{temporal_hints(user_question)}\n\nQuestion: {user_question}"
answer = pandas_agent.chat(combined_prompt)
if looks_like_error(answer):
logger.warning(f"[{request_id}] PandasAI invalid answer; fallback.")
raise RuntimeError("PandasAI invalid answer")
if isinstance(answer, pd.DataFrame):
return jsonify({"answer": answer.to_html(), "meta": {"source": "pandasai"}})
if isinstance(answer, plt.Figure):
buf = io.BytesIO()
answer.savefig(buf, format="png")
data_uri = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode('utf-8')}"
return jsonify({"answer": data_uri, "meta": {"source": "pandasai"}})
return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "pandasai"}})
except Exception as e:
logger.exception(f"[{request_id}] Tier 1 failed; moving to analyst.")
engine = IrisReportEngine(profile_id=profile_id, transactions_data=transactions, llm_instance=llm)
briefing = engine.get_business_intelligence_briefing()
fallback_answer = engine.synthesize_fallback_response(briefing, user_question)
return jsonify({"answer": sanitize_answer(fallback_answer), "meta": {"source": "analyst_fallback"}})
except Exception as e:
logger.exception(f"[{request_id}] Critical unexpected error in /chat: {e}")
return jsonify({"answer": "Something went wrong on our side. Please try again."})
# -----------------------------------------------------------------------------
# Report / Marketing / Notify — unchanged logic with safer logging
# -----------------------------------------------------------------------------
@app.route("/report", methods=["POST"])
@cross_origin()
def busines_report():
logger.info("=== /report ===")
try:
request_json = request.get_json()
json_data = request_json.get("json_data") if request_json else None
prompt = (
"You are Quantilytix business analyst. Analyze the following data and generate a "
"comprehensive and insightful business report, including appropriate key performance "
"indicators and recommendations. Use markdown formatting and tables where necessary. "
"Only return the report and nothing else.\nDATA:\n" + str(json_data)
)
response = model.generate_content(prompt)
return jsonify(str(response.text))
except Exception as e:
logger.exception("Error in /report")
return jsonify({"error": "Failed to generate report.", "details": str(e)}), 500
@app.route("/marketing", methods=["POST"])
@cross_origin()
def marketing():
logger.info("=== /marketing ===")
try:
request_json = request.get_json()
json_data = request_json.get("json_data") if request_json else None
prompt = (
"You are a Quantilytix Marketing Specialist. Analyze the following data and generate "
"a creative, concise marketing strategy. Only return the strategy.\n" + str(json_data)
)
response = model.generate_content(prompt)
return jsonify(str(response.text))
except Exception as e:
logger.exception("Error in /marketing")
return jsonify({"error": "Failed to generate marketing strategy.", "details": str(e)}), 500
@app.route("/notify", methods=["POST"])
@cross_origin()
def notifications():
logger.info("=== /notify ===")
try:
request_json = request.get_json()
json_data = request_json.get("json_data") if request_json else None
prompt = (
"You are a Quantilytix business analyst. Write a very brief analysis and marketing tips "
"using this business data. Output must be suitable for a notification dashboard.\n" + str(json_data)
)
response = model.generate_content(prompt)
return jsonify(str(response.text))
except Exception as e:
logger.exception("Error in /notify")
return jsonify({"error": "Failed to generate notification content.", "details": str(e)}), 500
# -----------------------------------------------------------------------------
# ElevenLabs Voice Briefing Endpoints — history summary separated from KPI
# -----------------------------------------------------------------------------
def _synthesize_history_summary(call_history: List[dict]) -> str:
"""Summarize past call transcripts only — never touches KPI data."""
if not call_history:
return "• New caller — no prior call history."
history_json = json.dumps(json_safe(call_history), indent=2)
analyst_prompt = (
"You are an executive assistant preparing a pre-call briefing for a voice AI business analyst named Iris.\n"
"ONLY analyze the user's past call history and identify recurring themes or concerns.\n\n"
"USER'S PAST CALL HISTORY (Transcripts):\n"
f"{history_json}\n\n"
"- Output a few bullet points only. No fluff."
)
try:
response = model.generate_content(analyst_prompt)
summary = (response.text or "").strip()
logger.info(f"Generated conversation history summary ({len(summary)} chars).")
return summary
except Exception as e:
logger.error(f"Failed to generate history summary: {e}")
return "• Could not summarize prior calls."
@app.route("/api/log-call-usage", methods=["POST"])
@cross_origin()
def log_call_usage():
"""Logs the transcript of a completed call to Firebase."""
logger.info("=== /api/log-call-usage ===")
payload = request.get_json() or {}
profile_id = payload.get("profile_id")
transcript = payload.get("transcript")
duration = payload.get("durationSeconds")
if not profile_id or not transcript:
return jsonify({"error": "Missing 'profile_id' or 'transcript'."}), 400
try:
call_id = f"call_{int(time.time())}"
transcript_ref = db_ref.child(f"transcripts/{profile_id}/{call_id}")
transcript_data = {
"transcript": transcript,
"profileId": profile_id,
"durationSeconds": duration,
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
transcript_ref.set(json_safe(transcript_data))
logger.info(f"Stored transcript for profile '{profile_id}'.")
return jsonify({"status": "success"}), 200
except Exception as e:
logger.exception(f"Firebase error storing transcript for '{profile_id}': {e}")
return jsonify({"error": "A server error occurred while storing the transcript."}), 500
@app.route("/api/call-briefing", methods=["POST"])
@cross_origin()
def get_call_briefing():
"""
Returns:
- memory_summary: bullets summarizing prior call transcripts
- kpi_snapshot: deterministic KPI dict for the live profile (no LLM in numbers)
"""
logger.info("=== /api/call-briefing ===")
profile_id = str((request.get_json() or {}).get("profile_id") or "").strip()
if not profile_id:
return jsonify({"error": "Missing 'profile_id'."}), 400
try:
# 1) Summarize call history
call_history = []
try:
transcripts = db_ref.child(f"transcripts/{profile_id}").get()
if transcripts:
call_history = list(transcripts.values())
logger.info(f"Found {len(call_history)} past transcripts for '{profile_id}'.")
except Exception as e:
logger.warning(f"Could not fetch transcript history for '{profile_id}': {e}")
memory_summary = _synthesize_history_summary(call_history)
# 2) KPI Snapshot from live transactions
kpi_snapshot: Dict[str, Any] = {"Status": "Could not retrieve business data."}
API_URL = "https://irisplustech.com/public/api/business/profile/user/get-recent-transactions-v2"
try:
resp = requests.post(
API_URL,
data={"profile_id": urllib.parse.quote_plus(profile_id)},
timeout=20
)
resp.raise_for_status()
transactions = (resp.json() or {}).get("transactions") or []
if transactions:
engine = IrisReportEngine(profile_id=profile_id, transactions_data=transactions, llm_instance=llm)
kpi_snapshot = engine.get_business_intelligence_briefing()
else:
kpi_snapshot = {"Status": "No transaction data found for this profile."}
except Exception as e:
logger.warning(f"Txn fetch failed for briefing '{profile_id}': {e}")
return jsonify({
"memory_summary": memory_summary,
"kpi_snapshot": json_safe(kpi_snapshot)
}), 200
except Exception as e:
logger.exception(f"Critical error in call-briefing for '{profile_id}': {e}")
return jsonify({"error": "Failed to generate call briefing."}), 500
# -----------------------------------------------------------------------------
# Entrypoint
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# Do NOT use debug=True in production; enabling here for dev.
app.run(debug=True, host="0.0.0.0", port=7860)