Spaces:
Sleeping
Sleeping
| # app.py — FIXED: KPI snapshot determinism + deep crunch logging (server + optional Firebase) | |
| # - Robust ColumnResolver (no guessing by the model; no hallucinated fields) | |
| # - Deterministic time windows (Harare tz; explicit start/end) | |
| # - KPI engine never uses LLM for numbers (LLM is narration-only fallback) | |
| # - JSON-safe snapshot (no pandas/Index objects) | |
| # - Detailed DEBUG logs for each crunch stage; optional Firebase debug logs | |
| # - Sanitized PandasAI output; hard guard on errors | |
| # - Cleaned Firebase init; clean FlaskResponse ResponseParser | |
| # - Safer temporal hints + guardrails | |
| # - Fixed sanitize_answer() bug and many stray smart-quotes/copy artifacts | |
| from __future__ import annotations | |
| import os | |
| import io | |
| import re | |
| import json | |
| import time | |
| import uuid | |
| import base64 | |
| import logging | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import requests | |
| import urllib.parse | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS, cross_origin | |
| from dotenv import load_dotenv | |
| # LLMs | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| import google.generativeai as genai | |
| # PandasAI | |
| from pandasai import SmartDataframe | |
| from pandasai.responses.response_parser import ResponseParser | |
| # Firebase | |
| import firebase_admin | |
| from firebase_admin import credentials, db | |
| # ----------------------------------------------------------------------------- | |
| # Init | |
| # ----------------------------------------------------------------------------- | |
| load_dotenv() | |
| app = Flask(__name__) | |
| CORS(app) | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger("iris-app") | |
| # ----------------------------------------------------------------------------- | |
| # Firebase Initialization | |
| # ----------------------------------------------------------------------------- | |
| try: | |
| credentials_json_string = os.environ.get("FIREBASE") | |
| if not credentials_json_string: | |
| raise ValueError("FIREBASE env var is not set") | |
| credentials_json = json.loads(credentials_json_string) | |
| firebase_db_url = os.environ.get("Firebase_DB") | |
| if not firebase_db_url: | |
| raise ValueError("Firebase_DB env var is not set") | |
| cred = credentials.Certificate(credentials_json) | |
| firebase_admin.initialize_app(cred, {"databaseURL": firebase_db_url}) | |
| db_ref = db.reference() | |
| logger.info("Firebase Admin SDK initialized.") | |
| except Exception as e: | |
| logger.fatal(f"FATAL: Firebase init failed: {e}") | |
| raise | |
| LOG_KPI_TO_FIREBASE = os.getenv("LOG_KPI_TO_FIREBASE", "0") == "1" | |
| # ----------------------------------------------------------------------------- | |
| # Response parser for PandasAI | |
| # ----------------------------------------------------------------------------- | |
| class FlaskResponse(ResponseParser): | |
| def __init__(self, context): | |
| super().__init__(context) | |
| def format_dataframe(self, result): | |
| try: | |
| return result["value"].to_html() | |
| except Exception: | |
| return "" | |
| def format_plot(self, result): | |
| val = result.get("value") | |
| if hasattr(val, "savefig"): | |
| buf = io.BytesIO() | |
| val.savefig(buf, format="png") | |
| buf.seek(0) | |
| return f"data:image/png;base64,{base64.b64encode(buf.read()).decode('utf-8')}" | |
| if isinstance(val, str) and os.path.isfile(os.path.join(val)): | |
| with open(os.path.join(val), "rb") as f: | |
| return f"data:image/png;base64,{base64.b64encode(f.read()).decode('utf-8')}" | |
| return str(val) | |
| def format_other(self, result): | |
| return str(result.get("value", "")) | |
| # ----------------------------------------------------------------------------- | |
| # LLM init | |
| # ----------------------------------------------------------------------------- | |
| logger.info("Initializing models…") | |
| gemini_api_key = os.getenv("Gemini") | |
| if not gemini_api_key: | |
| raise ValueError("Gemini API key is required (env var Gemini).") | |
| llm = ChatGoogleGenerativeAI( | |
| api_key=gemini_api_key, | |
| model="gemini-2.0-flash", | |
| temperature=0.1 | |
| ) | |
| genai.configure(api_key=gemini_api_key) | |
| generation_config = {"temperature": 0.2, "top_p": 0.95, "max_output_tokens": 5000} | |
| model = genai.GenerativeModel( | |
| model_name="gemini-2.0-flash-lite-001", | |
| generation_config=generation_config, | |
| ) | |
| logger.info("AI models initialized.") | |
| user_defined_path = os.path.join("/exports/charts", str(uuid.uuid4())) | |
| logger.info(f"Chart export path set to: {user_defined_path}") | |
| # ----------------------------------------------------------------------------- | |
| # Temporal helpers + guardrails | |
| # ----------------------------------------------------------------------------- | |
| TZ = "Africa/Harare" | |
| def now_harare() -> pd.Timestamp: | |
| return pd.Timestamp.now(tz=TZ) | |
| def week_bounds_from(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: | |
| # Monday 00:00:00 to Sunday 23:59:59 | |
| monday = ts.tz_convert(TZ).normalize() - pd.Timedelta(days=ts.weekday()) | |
| sunday = monday + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59) | |
| return monday, sunday | |
| def next_week_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: | |
| this_mon, _ = week_bounds_from(ts) | |
| next_mon = this_mon + pd.Timedelta(days=7) | |
| next_sun = next_mon + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59) | |
| return next_mon, next_sun | |
| def last_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: | |
| first_this = ts.normalize().replace(day=1) | |
| last_month_end = first_this - pd.Timedelta(seconds=1) | |
| first_last = last_month_end.replace(day=1).normalize() | |
| last_month_end = last_month_end.replace(hour=23, minute=59, second=59) | |
| return first_last, last_month_end | |
| def this_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: | |
| first_this = ts.normalize().replace(day=1) | |
| if first_this.month == 12: | |
| first_next = first_this.replace(year=first_this.year + 1, month=1) | |
| else: | |
| first_next = first_this.replace(month=first_this.month + 1) | |
| last_this = first_next - pd.Timedelta(seconds=1) | |
| return first_this, last_this | |
| def quarter_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: | |
| q = (ts.month - 1) // 3 + 1 | |
| first_month = 3 * (q - 1) + 1 | |
| first = ts.normalize().replace(month=first_month, day=1) | |
| if first_month == 10: | |
| first_next = first.replace(year=first.year + 1, month=1) | |
| else: | |
| first_next = first.replace(month=first_month + 3) | |
| last = first_next - pd.Timedelta(seconds=1) | |
| return first, last | |
| _TEMP_WINDOWS = [ | |
| ("next week", lambda base: next_week_bounds(base)), | |
| ("this week", lambda base: week_bounds_from(base)), | |
| ("last week", lambda base: week_bounds_from(base - pd.Timedelta(days=7))), | |
| ("yesterday", lambda base: ( | |
| base.normalize() - pd.Timedelta(days=1), | |
| base.normalize() - pd.Timedelta(seconds=1) | |
| )), | |
| ("tomorrow", lambda base: ( | |
| base.normalize() + pd.Timedelta(days=1), | |
| base.normalize() + pd.Timedelta(days=1, hours=23, minutes=59, seconds=59) | |
| )), | |
| ("this month", lambda base: this_month_bounds(base)), | |
| ("last month", lambda base: last_month_bounds(base)), | |
| ("this quarter", lambda base: quarter_bounds(base)), | |
| ] | |
| def extract_numeric_window(question: str): | |
| m = re.search(r"(last|past)\s+(\d{1,3})\s+days", question.lower()) | |
| if m: | |
| n = int(m.group(2)) | |
| end = now_harare() | |
| start = end - pd.Timedelta(days=n) | |
| return start, end | |
| return None | |
| def temporal_hints(question: str) -> str: | |
| base = now_harare() | |
| hints = {} | |
| ql = question.lower() | |
| for key, fn in _TEMP_WINDOWS: | |
| if key in ql: | |
| s, e = fn(base) | |
| hints[key] = (s.strftime("%Y-%m-%d %H:%M:%S%z"), e.strftime("%Y-%m-%d %H:%M:%S%z")) | |
| rng = extract_numeric_window(question) | |
| if rng: | |
| s, e = rng | |
| hints[f"last {(e - s).days} days"] = (s.strftime("%Y-%m-%d %H:%M:%S%z"), e.strftime("%Y-%m-%d %H:%M:%S%z")) | |
| if not hints: | |
| return ( | |
| f"Temporal context: Today is {base.strftime('%Y-%m-%d')} ({TZ}). " | |
| f"Week is Monday–Sunday. Use pd.Timestamp.now(tz='{TZ}') and pd.Timedelta. " | |
| f"Avoid non-fixed rolling frequencies; use daily resample or dt.to_period('M')." | |
| ) | |
| parts = [f"Temporal context: Today is {base.strftime('%Y-%m-%d')} ({TZ})."] | |
| for k, (s, e) in hints.items(): | |
| parts.append(f'Interpret "{k}" as {s} to {e}.') | |
| parts.append("Avoid non-fixed rolling frequencies (MonthBegin/MonthEnd/Week). Prefer daily resample or Period groupby.") | |
| return " ".join(parts) | |
| def guardrails_preamble() -> str: | |
| return ( | |
| "Rules for any code you generate:\n" | |
| "1) Do NOT use 'from datetime import datetime' or 'datetime.date.today()'.\n" | |
| "2) Use pandas time APIs only: pd.Timestamp.now(tz='Africa/Harare'), pd.Timedelta, dt accessors.\n" | |
| "3) If a Time column exists, combine Date + Time and localize to Africa/Harare.\n" | |
| "4) Ensure numeric conversion with errors='coerce' for amount fields.\n" | |
| "5) Avoid non-fixed rolling/resample frequencies. For monthly, use df['datetime'].dt.to_period('M').\n" | |
| "6) Never print stack traces; return a concise answer or a plot/dataframe." | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Error detection & sanitization | |
| # ----------------------------------------------------------------------------- | |
| ERROR_PATTERNS = [ | |
| "traceback", "exception", "keyerror", "nameerror", "syntaxerror", | |
| "modulenotfounderror", "importerror", "pipeline failed", "execution failed", | |
| "__import__", "failed with error", "attributeerror", "method_descriptor", | |
| "unfortunately, i was not able to answer your question", | |
| "non-fixed frequency", "monthbegin", "monthend", "week:", "weekday=" | |
| ] | |
| def _stringify(obj) -> str: | |
| try: | |
| if isinstance(obj, (pd.DataFrame, plt.Figure)): | |
| return "" | |
| if isinstance(obj, (bytes, bytearray)): | |
| return obj.decode("utf-8", errors="ignore") | |
| return str(obj) | |
| except Exception: | |
| return "" | |
| def _extract_text_like(ans): | |
| if isinstance(ans, dict): | |
| if "value" in ans: | |
| return _stringify(ans["value"]) | |
| for k in ("message", "text", "content"): | |
| if k in ans: | |
| return _stringify(ans[k]) | |
| return _stringify(ans) | |
| if hasattr(ans, "value"): | |
| try: | |
| return _stringify(getattr(ans, "value")) | |
| except Exception: | |
| pass | |
| return _stringify(ans) | |
| def looks_like_error(ans) -> bool: | |
| if isinstance(ans, (pd.DataFrame, plt.Figure)): | |
| return False | |
| s = _extract_text_like(ans).strip().lower() | |
| if not s: | |
| return True | |
| if any(p in s for p in ERROR_PATTERNS): | |
| return True | |
| if (("file " in s and " line " in s and "error" in s) or ("valueerror:" in s)): | |
| return True | |
| return False | |
| def sanitize_answer(ans) -> str: | |
| s = _extract_text_like(ans) | |
| # remove triple-backtick fences | |
| s = re.sub(r"```+\w*", "", s or "") | |
| # strip tracebacks safely | |
| tb = "Traceback (most recent call last):" | |
| if tb in s: | |
| s = s.split(tb, 1)[0] | |
| if "Unfortunately, I was not able to answer your question" in (s or ""): | |
| s = "" | |
| return (s or "").strip() | |
| # ----------------------------------------------------------------------------- | |
| # KPI Engine | |
| # ----------------------------------------------------------------------------- | |
| class ColumnResolver: | |
| """Map messy input columns to canonical names without guessing via LLM.""" | |
| AMOUNT_CANDIDATES = [ | |
| "Amount", "amount", "Total", "total", "Settled_Amount", "GrandTotal", | |
| "NetAmount", "Invoice_Total", "LineTotal" | |
| ] | |
| DATE_CANDIDATES = ["datetime", "Date", "date", "TxnDate", "CreatedAt", "created_at", "timestamp"] | |
| TIME_CANDIDATES = ["Time", "time", "TxnTime"] | |
| CURR_CANDIDATES = ["Currency", "currency", "Curr"] | |
| INVOICE_CANDIDATES = ["Invoice_Number", "InvoiceNo", "invoice_number", "Receipt_Number", "Order_No", "Txn_ID", "TxnId"] | |
| PRODUCT_CANDIDATES = ["Product", "Product_Name", "Item", "Description", "Item_Name"] | |
| UNITS_CANDIDATES = ["Units_Sold", "Qty", "Quantity", "Units"] | |
| UNIT_COST_CANDIDATES = ["Unit_Cost_Price", "UnitCost", "CostPrice"] | |
| TYPE_CANDIDATES = ["Transaction_Type", "TxnType", "Type"] | |
| TELLER_CANDIDATES = ["Teller_Username", "Teller", "Cashier", "User"] | |
| def pick_first(df: pd.DataFrame, names: List[str]) -> Optional[str]: | |
| for n in names: | |
| if n in df.columns: | |
| return n | |
| return None | |
| def pick_first_numeric(df: pd.DataFrame, names: List[str]) -> Optional[str]: | |
| for n in names: | |
| if n in df.columns: | |
| s = pd.to_numeric(df[n], errors="coerce") | |
| nonnull_ratio = s.notna().mean() if len(s) else 0 | |
| if nonnull_ratio >= 0.2: # needs at least some usable data | |
| return n | |
| return None | |
| def map(df: pd.DataFrame) -> Dict[str, Optional[str]]: | |
| return { | |
| "amount": ColumnResolver.pick_first_numeric(df, ColumnResolver.AMOUNT_CANDIDATES), | |
| "date": ColumnResolver.pick_first(df, ColumnResolver.DATE_CANDIDATES), | |
| "time": ColumnResolver.pick_first(df, ColumnResolver.TIME_CANDIDATES), | |
| "currency": ColumnResolver.pick_first(df, ColumnResolver.CURR_CANDIDATES), | |
| "invoice": ColumnResolver.pick_first(df, ColumnResolver.INVOICE_CANDIDATES), | |
| "product": ColumnResolver.pick_first(df, ColumnResolver.PRODUCT_CANDIDATES), | |
| "units": ColumnResolver.pick_first_numeric(df, ColumnResolver.UNITS_CANDIDATES), | |
| "unit_cost": ColumnResolver.pick_first_numeric(df, ColumnResolver.UNIT_COST_CANDIDATES), | |
| "txn_type": ColumnResolver.pick_first(df, ColumnResolver.TYPE_CANDIDATES), | |
| "teller": ColumnResolver.pick_first(df, ColumnResolver.TELLER_CANDIDATES), | |
| } | |
| def json_safe(obj: Any) -> Any: | |
| """Coerce numpy/pandas scalars & indexes to plain Python.""" | |
| if isinstance(obj, (np.integer,)): | |
| return int(obj) | |
| if isinstance(obj, (np.floating,)): | |
| return float(obj) | |
| if isinstance(obj, (np.bool_,)): | |
| return bool(obj) | |
| if isinstance(obj, (pd.Timestamp,)): | |
| return obj.isoformat() | |
| if isinstance(obj, (pd.Series,)): | |
| return obj.to_dict() | |
| if isinstance(obj, (pd.Index,)): | |
| return [json_safe(x) for x in obj.tolist()] | |
| if isinstance(obj, (dict,)): | |
| return {k: json_safe(v) for k, v in obj.items()} | |
| if isinstance(obj, (list, tuple)): | |
| return [json_safe(x) for x in obj] | |
| return obj | |
| def emit_kpi_debug(profile_id: str, stage: str, payload: Dict[str, Any]) -> None: | |
| """Emit deep crunch logs to server; optionally mirror to Firebase for inspection.""" | |
| try: | |
| log_obj = {"profile_id": profile_id, "stage": stage, "payload": payload} | |
| logger.debug("KPI_DEBUG %s", json.dumps(json_safe(log_obj))) | |
| if LOG_KPI_TO_FIREBASE: | |
| ts = int(time.time()) | |
| db_ref.child(f"kpi_debug/{profile_id}/{stage}_{ts}").set(json_safe(payload)) | |
| except Exception as e: | |
| logger.warning(f"Failed to emit KPI debug logs: {e}") | |
| class IrisReportEngine: | |
| """ | |
| Backwards-compatible KPI engine: | |
| - Keeps existing snapshot sections untouched | |
| - Adds: Basket Analysis, Product Affinity, Temporal Patterns, Customer Value, Product KPIs (expanded), | |
| Inventory (optional), Branch Analytics (per-branch + cross-branch), Cash reconciliation (optional) | |
| - Never uses LLM for numbers. LLM only for narration elsewhere. | |
| """ | |
| # ---- Canonical column names (single source of truth; no magic strings sprinkled around) ---- | |
| COL_INVOICE = "_Invoice" | |
| COL_PRODUCT = "_Product" | |
| COL_TELLER = "_Teller" | |
| COL_TXNTYPE = "_TxnType" | |
| COL_BRANCH = "_Branch" | |
| COL_CUSTOMER = "_Customer" | |
| COL_DT = "_datetime" | |
| COL_AMOUNT = "_Amount" | |
| COL_UNITS = "_Units" | |
| COL_UNITCOST = "_UnitCost" | |
| COL_REVENUE = "_Revenue" | |
| COL_COGS = "_COGS" | |
| COL_GP = "_GrossProfit" | |
| COL_HOUR = "_Hour" | |
| COL_DOW = "_DOW" | |
| COL_DOWI = "_DOW_idx" | |
| DEFAULT_PARAMS = { | |
| "top_k": 5, | |
| "min_revenue_for_margin_pct": 50.0, | |
| "min_tx_for_margin_pct": 3, | |
| "rfm_window_days": 365, | |
| "retention_factor": 1.0, | |
| "min_support_baskets": 5, # minimum basket count for a pair to be reported | |
| "min_lift": 1.2, | |
| "blocked_products": ["Purchase"], # exclude accounting placeholders from product leaderboards/affinity | |
| "cash_variance_threshold_abs": 10.0, | |
| "cash_variance_threshold_pct": 0.008, # 0.8% | |
| } | |
| def __init__( | |
| self, | |
| profile_id: str, | |
| transactions_data: List[dict], | |
| llm_instance, | |
| stock_feed: Optional[List[Dict[str, Any]]] = None, | |
| cash_float_feed: Optional[List[Dict[str, Any]]] = None, | |
| params: Optional[Dict[str, Any]] = None, | |
| ): | |
| self.profile_id = profile_id | |
| self.llm = llm_instance | |
| self.params = {**self.DEFAULT_PARAMS, **(params or {})} | |
| self.raw = pd.DataFrame(transactions_data) | |
| self.stock_feed = pd.DataFrame(stock_feed) if stock_feed else pd.DataFrame() | |
| self.cash_float_feed = pd.DataFrame(cash_float_feed) if cash_float_feed else pd.DataFrame() | |
| self.df = self._load_and_prepare_data(self.raw) | |
| self.currency = self._get_primary_currency() | |
| # ------------------------- small helpers ------------------------- | |
| def _rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]: | |
| if df is None or df.empty: | |
| return [] | |
| out = [] | |
| for _, r in df.iterrows(): | |
| out.append({ | |
| "customer": str(r.get("customer")), | |
| "orders": int(r.get("orders", 0)), | |
| "revenue": float(r.get("revenue", 0.0)), | |
| "gp": float(r.get("gp", 0.0)), | |
| "recency_days": float(r.get("recency_days", np.nan)) if pd.notna(r.get("recency_days")) else None, | |
| "avg_basket_value": float(r.get("avg_basket_value", np.nan)) if pd.notna(r.get("avg_basket_value")) else None, | |
| }) | |
| return out | |
| def _has(self, df: pd.DataFrame, col: str) -> bool: | |
| return isinstance(df, pd.DataFrame) and col in df.columns | |
| # ------------------------- load/prepare ------------------------- | |
| def _load_and_prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| if df is None or df.empty: | |
| emit_kpi_debug(self.profile_id, "load", {"status": "empty_input"}) | |
| return pd.DataFrame() | |
| mapping = ColumnResolver.map(df) | |
| emit_kpi_debug(self.profile_id, "column_map", mapping) | |
| # Numerics | |
| amt_col = mapping["amount"] or ("Settled_Amount" if "Settled_Amount" in df.columns else None) | |
| if amt_col and amt_col in df: | |
| df[self.COL_AMOUNT] = pd.to_numeric(df[amt_col], errors="coerce") | |
| else: | |
| df[self.COL_AMOUNT] = pd.Series(dtype=float) | |
| if mapping["units"] and mapping["units"] in df: | |
| df[self.COL_UNITS] = pd.to_numeric(df[mapping["units"]], errors="coerce").fillna(0) | |
| else: | |
| df[self.COL_UNITS] = 0 | |
| if mapping["unit_cost"] and mapping["unit_cost"] in df: | |
| df[self.COL_UNITCOST] = pd.to_numeric(df[mapping["unit_cost"]], errors="coerce").fillna(0.0) | |
| else: | |
| df[self.COL_UNITCOST] = 0.0 | |
| # Datetime | |
| if mapping["date"] and mapping["date"] in df: | |
| if mapping["time"] and mapping["time"] in df: | |
| dt_series = pd.to_datetime( | |
| df[mapping["date"]].astype(str) + " " + df[mapping["time"]].astype(str), | |
| errors="coerce" | |
| ) | |
| else: | |
| dt_series = pd.to_datetime(df[mapping["date"]], errors="coerce") | |
| else: | |
| dt_series = pd.to_datetime(df.get("datetime"), errors="coerce") | |
| try: | |
| if getattr(dt_series.dt, "tz", None) is None: | |
| dt_series = dt_series.dt.tz_localize(TZ, nonexistent="shift_forward", ambiguous="NaT") | |
| else: | |
| dt_series = dt_series.dt.tz_convert(TZ) | |
| except Exception: | |
| pass | |
| df[self.COL_DT] = dt_series | |
| df = df.dropna(subset=[self.COL_DT]).copy() | |
| # Canonical dims | |
| df[self.COL_INVOICE] = df[mapping["invoice"]] if mapping["invoice"] and mapping["invoice"] in df else None | |
| df[self.COL_PRODUCT] = df[mapping["product"]] if mapping["product"] and mapping["product"] in df else None | |
| df[self.COL_TELLER] = df[mapping["teller"]] if mapping["teller"] and mapping["teller"] in df else None | |
| df[self.COL_TXNTYPE] = (df[mapping["txn_type"]].astype(str).str.lower() | |
| if mapping["txn_type"] and mapping["txn_type"] in df | |
| else df.get("Transaction_Type", "").astype(str).str.lower()) | |
| df[self.COL_BRANCH] = df.get("Branch") | |
| df[self.COL_CUSTOMER] = df.get("Customer_Reference") | |
| # Sales filter: keep explicit sales OR Transaction_Type_ID 21 OR positive amounts | |
| txid_series = df.get("Transaction_Type_ID") | |
| sales_mask = ( | |
| df[self.COL_TXNTYPE].isin(["sale", "sales", "invoice"]) | | |
| (pd.Series(False, index=df.index) if txid_series is None else txid_series.isin([21])) | | |
| (df[self.COL_AMOUNT] > 0) | |
| ) | |
| working = df[sales_mask].copy() | |
| # Derive measures | |
| working[self.COL_REVENUE] = working[self.COL_AMOUNT].fillna(0.0) | |
| working[self.COL_COGS] = (working[self.COL_UNITCOST] * working[self.COL_UNITS]).fillna(0.0) | |
| working[self.COL_GP] = (working[self.COL_REVENUE] - working[self.COL_COGS]).fillna(0.0) | |
| working[self.COL_HOUR] = working[self.COL_DT].dt.hour | |
| working[self.COL_DOW] = working[self.COL_DT].dt.day_name() | |
| working[self.COL_DOWI] = working[self.COL_DT].dt.dayofweek # 0=Mon .. 6=Sun | |
| # Deduplicate exact duplicate sale lines | |
| before = len(working) | |
| dedupe_keys = ["Transaction_ID", self.COL_INVOICE, self.COL_PRODUCT, self.COL_UNITS, self.COL_AMOUNT, self.COL_DT] | |
| existing_keys = [k for k in dedupe_keys if k in working.columns] | |
| if existing_keys: | |
| working = working.drop_duplicates(subset=existing_keys) | |
| duplicates_dropped = before - len(working) | |
| # Drop zero rows if both revenue and cost are zero | |
| working = working[(working[self.COL_REVENUE].abs() > 0) | (working[self.COL_COGS].abs() > 0)] | |
| emit_kpi_debug(self.profile_id, "prepared_counts", { | |
| "raw_rows": int(len(self.raw)), | |
| "rows_with_datetime": int(len(df)), | |
| "sale_like_rows": int(len(working)), | |
| "duplicates_dropped": int(duplicates_dropped), | |
| }) | |
| self._prepared_dupes_dropped = int(duplicates_dropped) | |
| self._non_sale_excluded = int(len(df) - len(working)) | |
| return working | |
| def _get_primary_currency(self) -> str: | |
| try: | |
| mapping = ColumnResolver.map(self.raw) | |
| if mapping["currency"] and mapping["currency"] in self.raw: | |
| mode_series = self.raw[mapping["currency"]].dropna().astype(str) | |
| if not mode_series.empty: | |
| val = mode_series.mode() | |
| if not val.empty: | |
| return str(val.iloc[0]) | |
| except Exception: | |
| pass | |
| return "USD" | |
| # ------------------------- timeframes & headline ------------------------- | |
| def _get_comparison_timeframes(self) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]: | |
| if self.df.empty: | |
| return self.df, self.df, {} | |
| now = now_harare() | |
| start_cur, end_cur = week_bounds_from(now) | |
| start_prev = start_cur - pd.Timedelta(days=7) | |
| end_prev = start_cur - pd.Timedelta(seconds=1) | |
| current_df = self.df[(self.df[self.COL_DT] >= start_cur) & (self.df[self.COL_DT] <= end_cur)] | |
| previous_df = self.df[(self.df[self.COL_DT] >= start_prev) & (self.df[self.COL_DT] <= end_prev)] | |
| meta = { | |
| "period_label": "This Week vs. Last Week", | |
| "current_start": start_cur.isoformat(), | |
| "current_end": end_cur.isoformat(), | |
| "prev_start": start_prev.isoformat(), | |
| "prev_end": end_prev.isoformat(), | |
| "current_rows": int(len(current_df)), | |
| "previous_rows": int(len(previous_df)), | |
| } | |
| emit_kpi_debug(self.profile_id, "timeframes", meta) | |
| return current_df, previous_df, meta | |
| def _pct_change(cur: float, prev: float) -> str: | |
| if prev == 0: | |
| return "+100%" if cur > 0 else "0.0%" | |
| return f"{((cur - prev) / prev) * 100:+.1f}%" | |
| def _headline(self, cur_df: pd.DataFrame, prev_df: pd.DataFrame) -> Dict[str, Any]: | |
| cur_rev = float(cur_df[self.COL_REVENUE].sum()) if not cur_df.empty else 0.0 | |
| prev_rev = float(prev_df[self.COL_REVENUE].sum()) if not prev_df.empty else 0.0 | |
| cur_gp = float(cur_df[self.COL_GP].sum()) if not cur_df.empty else 0.0 | |
| prev_gp = float(prev_df[self.COL_GP].sum()) if not prev_df.empty else 0.0 | |
| if self._has(cur_df, self.COL_INVOICE) and cur_df[self.COL_INVOICE].notna().any(): | |
| tx_now = int(cur_df[self.COL_INVOICE].nunique()) | |
| else: | |
| tx_now = int(len(cur_df)) | |
| if self._has(prev_df, self.COL_INVOICE) and prev_df[self.COL_INVOICE].notna().any(): | |
| tx_prev = int(prev_df[self.COL_INVOICE].nunique()) | |
| else: | |
| tx_prev = int(len(prev_df)) | |
| head = { | |
| "total_revenue_value": round(cur_rev, 2), | |
| "total_revenue_fmt": f"{self.currency} {cur_rev:,.2f}", | |
| "total_revenue_change": self._pct_change(cur_rev, prev_rev), | |
| "gross_profit_value": round(cur_gp, 2), | |
| "gross_profit_fmt": f"{self.currency} {cur_gp:,.2f}", | |
| "gross_profit_change": self._pct_change(cur_gp, prev_gp), | |
| "transactions_value": tx_now, | |
| "transactions_change": self._pct_change(tx_now, tx_prev), | |
| } | |
| emit_kpi_debug(self.profile_id, "headline", head) | |
| return head | |
| # ------------------------- core builders ------------------------- | |
| def _build_product_aggregates(self, cur_df: pd.DataFrame) -> pd.DataFrame: | |
| if cur_df.empty: | |
| return pd.DataFrame(columns=[ | |
| self.COL_PRODUCT,"revenue","units","cogs","gross_profit","margin_pct", | |
| "avg_selling_price","avg_unit_cost","tx_count" | |
| ]) | |
| df = cur_df.copy() | |
| # Exclude blocked products for leaderboards/affinity, but keep them in totals if needed | |
| if self.params["blocked_products"]: | |
| df = df[~df[self.COL_PRODUCT].astype(str).str.strip().isin(self.params["blocked_products"])] | |
| # Tx count via invoice nunique if available | |
| if self._has(df, self.COL_INVOICE) and df[self.COL_INVOICE].notna().any(): | |
| g = df.groupby(self.COL_PRODUCT, dropna=False).agg( | |
| revenue=(self.COL_REVENUE,"sum"), | |
| units=(self.COL_UNITS,"sum"), | |
| cogs=(self.COL_COGS,"sum"), | |
| gp=(self.COL_GP,"sum"), | |
| tx=(self.COL_INVOICE,"nunique") | |
| ) | |
| else: | |
| g = df.groupby(self.COL_PRODUCT, dropna=False).agg( | |
| revenue=(self.COL_REVENUE,"sum"), | |
| units=(self.COL_UNITS,"sum"), | |
| cogs=(self.COL_COGS,"sum"), | |
| gp=(self.COL_GP,"sum"), | |
| tx=(self.COL_PRODUCT,"size") | |
| ) | |
| g = g.rename(columns={"gp":"gross_profit", "tx":"tx_count"}).reset_index() | |
| # Derived ratios | |
| g["margin_pct"] = np.where(g["revenue"] > 0, g["gross_profit"] / g["revenue"], np.nan) | |
| g["avg_selling_price"] = np.where(g["units"] > 0, g["revenue"] / g["units"], np.nan) | |
| g["avg_unit_cost"] = np.where(g["units"] > 0, g["cogs"] / g["units"], np.nan) | |
| return g | |
| def _build_basket_table(self, cur_df: pd.DataFrame) -> pd.DataFrame: | |
| if cur_df.empty or not self._has(cur_df, self.COL_INVOICE): | |
| return pd.DataFrame(columns=[self.COL_INVOICE,"basket_revenue","basket_gp","basket_items","_datetime_max"]) | |
| b = cur_df.groupby(self.COL_INVOICE, dropna=False).agg( | |
| basket_revenue=(self.COL_REVENUE,"sum"), | |
| basket_gp=(self.COL_GP,"sum"), | |
| basket_items=(self.COL_UNITS,"sum"), | |
| _datetime_max=(self.COL_DT,"max"), | |
| ).reset_index() | |
| return b | |
| def _basket_kpis(self, basket_df: pd.DataFrame) -> Dict[str, Any]: | |
| if basket_df is None or basket_df.empty: | |
| return { | |
| "avg_items_per_basket": "N/A", | |
| "avg_gross_profit_per_basket": "N/A", | |
| "median_basket_value": "N/A", | |
| "basket_size_distribution": {}, | |
| "low_sample": True | |
| } | |
| avg_items = float(basket_df["basket_items"].mean()) | |
| avg_gp = float(basket_df["basket_gp"].mean()) | |
| median_value = float(basket_df["basket_revenue"].median()) | |
| sizes = basket_df["basket_items"].fillna(0) | |
| bins = { | |
| "1": int((sizes == 1).sum()), | |
| "2-3": int(((sizes >= 2) & (sizes <= 3)).sum()), | |
| "4-5": int(((sizes >= 4) & (sizes <= 5)).sum()), | |
| "6_plus": int((sizes >= 6).sum()), | |
| } | |
| return { | |
| "avg_items_per_basket": round(avg_items, 2), | |
| "avg_gross_profit_per_basket": round(avg_gp, 2), | |
| "median_basket_value": round(median_value, 2), | |
| "basket_size_distribution": bins | |
| } | |
| def _affinity_pairs(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]: | |
| # Build unique product sets per invoice, count pairs | |
| if cur_df.empty or basket_df.empty or not self._has(cur_df, self.COL_PRODUCT) or not self._has(cur_df, self.COL_INVOICE): | |
| return {"params": self._affinity_params(), "top_pairs": []} | |
| tmp = cur_df[[self.COL_INVOICE, self.COL_PRODUCT]].dropna() | |
| if tmp.empty: | |
| return {"params": self._affinity_params(), "top_pairs": []} | |
| blocked = set(self.params.get("blocked_products", []) or []) | |
| tmp = tmp[~tmp[self.COL_PRODUCT].astype(str).str.strip().isin(blocked)] | |
| if tmp.empty: | |
| return {"params": self._affinity_params(), "top_pairs": []} | |
| products_per_invoice = tmp.groupby(self.COL_INVOICE)[self.COL_PRODUCT].agg(lambda s: sorted(set(map(str, s)))).reset_index() | |
| total_baskets = int(len(products_per_invoice)) | |
| if total_baskets == 0: | |
| return {"params": self._affinity_params(), "top_pairs": []} | |
| from collections import Counter | |
| single_counter = Counter() | |
| for prods in products_per_invoice[self.COL_PRODUCT]: | |
| single_counter.update(prods) | |
| pair_counter = Counter() | |
| for prods in products_per_invoice[self.COL_PRODUCT]: | |
| if len(prods) < 2: | |
| continue | |
| for i in range(len(prods)): | |
| for j in range(i+1, len(prods)): | |
| a, b = prods[i], prods[j] | |
| pair = (a, b) if a <= b else (b, a) | |
| pair_counter[pair] += 1 | |
| min_support_baskets = int(self.params["min_support_baskets"]) | |
| min_lift = float(self.params["min_lift"]) | |
| top_k = int(self.params["top_k"]) | |
| rows = [] | |
| # Average pair revenue across baskets containing both (optional; approximate) | |
| inv_with_products = cur_df.groupby(self.COL_INVOICE)[self.COL_PRODUCT].apply(lambda s: set(map(str, s.dropna()))) | |
| rev_by_inv = cur_df.groupby(self.COL_INVOICE)[self.COL_REVENUE].sum() | |
| for (a, b), ab_count in pair_counter.items(): | |
| if ab_count < min_support_baskets: | |
| continue | |
| support_a = single_counter.get(a, 0) / total_baskets | |
| support_b = single_counter.get(b, 0) / total_baskets | |
| support_ab = ab_count / total_baskets | |
| if support_a == 0 or support_b == 0: | |
| continue | |
| confidence = support_ab / support_a | |
| lift = support_ab / (support_a * support_b) if (support_a * support_b) > 0 else np.nan | |
| if not np.isfinite(lift) or lift < min_lift: | |
| continue | |
| inv_mask = inv_with_products.apply(lambda s: (a in s) and (b in s)) | |
| pair_invoices = inv_mask[inv_mask].index | |
| avg_pair_revenue = float(rev_by_inv.loc[pair_invoices].mean()) if len(pair_invoices) else np.nan | |
| rows.append({ | |
| "a": a, "b": b, | |
| "support_ab": round(float(support_ab), 6), | |
| "confidence_a_to_b": round(float(confidence), 6), | |
| "lift": round(float(lift), 6), | |
| "pair_basket_count": int(ab_count), | |
| "avg_pair_revenue": round(avg_pair_revenue, 2) if np.isfinite(avg_pair_revenue) else None, | |
| }) | |
| rows.sort(key=lambda r: (r["lift"], r["pair_basket_count"], r["support_ab"]), reverse=True) | |
| emit_kpi_debug(self.profile_id, "affinity_pairs_counts", { | |
| "total_baskets": total_baskets, "pairs_after_filters": len(rows) | |
| }) | |
| return {"params": self._affinity_params(), "top_pairs": rows[:top_k]} | |
| def _affinity_params(self) -> Dict[str, Any]: | |
| return { | |
| "min_support_baskets": int(self.params["min_support_baskets"]), | |
| "min_lift": float(self.params["min_lift"]), | |
| "top_k": int(self.params["top_k"]), | |
| } | |
| def _temporal_patterns(self, cur_df: pd.DataFrame) -> Dict[str, Any]: | |
| if cur_df.empty: | |
| return { | |
| "best_hour_by_profit": None, | |
| "best_day_by_profit": None, | |
| "hourly_series": [], | |
| "dow_series": [], | |
| "profit_heatmap_7x24": [] | |
| } | |
| gh = cur_df.groupby(self.COL_HOUR, dropna=False).agg( | |
| revenue=(self.COL_REVENUE,"sum"), | |
| gross_profit=(self.COL_GP,"sum") | |
| ).reset_index() | |
| best_hour_idx = int(gh.loc[gh["gross_profit"].idxmax(), self.COL_HOUR]) if not gh.empty else None | |
| best_hour_gp = float(gh["gross_profit"].max()) if not gh.empty else None | |
| gd = cur_df.groupby(self.COL_DOW, dropna=False).agg( | |
| revenue=(self.COL_REVENUE,"sum"), | |
| gross_profit=(self.COL_GP,"sum") | |
| ).reset_index() | |
| order_map = cur_df.groupby(self.COL_DOW)[self.COL_DOWI].max().to_dict() | |
| gd["__ord"] = gd[self.COL_DOW].map(order_map) | |
| gd = gd.sort_values("__ord", kind="stable") | |
| best_day_row = gd.loc[gd["gross_profit"].idxmax()] if not gd.empty else None | |
| best_day = {"day": str(best_day_row[self.COL_DOW]), "gross_profit": float(best_day_row["gross_profit"])} if best_day_row is not None else None | |
| m = cur_df.groupby([self.COL_DOWI, self.COL_HOUR], dropna=False)[self.COL_GP].sum().unstack(fill_value=0) | |
| m = m.reindex(index=range(0,7), columns=range(0,24), fill_value=0) | |
| heatmap = [[float(x) for x in row] for row in m.values.tolist()] | |
| hourly_series = gh.rename(columns={self.COL_HOUR:"hour"}).to_dict(orient="records") | |
| dow_series = gd[[self.COL_DOW,"revenue","gross_profit"]].rename(columns={self.COL_DOW:"day"}).to_dict(orient="records") | |
| return { | |
| "best_hour_by_profit": {"hour": best_hour_idx, "gross_profit": round(best_hour_gp, 2)} if best_hour_idx is not None else None, | |
| "best_day_by_profit": best_day, | |
| "hourly_series": [{"hour": int(r["hour"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in hourly_series], | |
| "dow_series": [{"day": str(r["day"]), "revenue": float(r["revenue"]), "gross_profit": float(r["gross_profit"])} for r in dow_series], | |
| "profit_heatmap_7x24": heatmap | |
| } | |
| def _customer_value(self, cur_df: pd.DataFrame, basket_df: pd.DataFrame) -> Dict[str, Any]: | |
| if cur_df.empty or not self._has(cur_df, self.COL_CUSTOMER): | |
| return { | |
| "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20}, | |
| "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []}, | |
| "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None} | |
| } | |
| df = cur_df.copy() | |
| last_date = df.groupby(self.COL_CUSTOMER)[self.COL_DT].max() | |
| if self._has(df, self.COL_INVOICE): | |
| orders = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].nunique() | |
| else: | |
| orders = df.groupby(self.COL_CUSTOMER).size() | |
| revenue = df.groupby(self.COL_CUSTOMER)[self.COL_REVENUE].sum() | |
| gp = df.groupby(self.COL_CUSTOMER)[self.COL_GP].sum() | |
| # Avg basket value per customer (from their invoices) | |
| if not basket_df.empty and self._has(df, self.COL_INVOICE): | |
| inv_to_rev = basket_df.set_index(self.COL_INVOICE)["basket_revenue"] | |
| cust_invoices = df.dropna(subset=[self.COL_INVOICE]).groupby(self.COL_CUSTOMER)[self.COL_INVOICE].agg(lambda x: sorted(set(x))) | |
| avg_basket_val = {} | |
| for cust, invs in cust_invoices.items(): | |
| vals = inv_to_rev.reindex(invs).dropna() | |
| avg_basket_val[cust] = float(vals.mean()) if len(vals) else np.nan | |
| avg_basket = pd.Series(avg_basket_val) | |
| else: | |
| avg_basket = pd.Series(dtype=float) | |
| base = now_harare().normalize() | |
| recency_days = (base - last_date).dt.total_seconds() / (60*60*24) | |
| rfm = pd.DataFrame({ | |
| "customer": last_date.index.astype(str), | |
| "last_date": last_date.values, | |
| "orders": orders.reindex(last_date.index).fillna(0).astype(int).values, | |
| "revenue": revenue.reindex(last_date.index).fillna(0.0).values, | |
| "gp": gp.reindex(last_date.index).fillna(0.0).values, | |
| "recency_days": recency_days.values, | |
| "avg_basket_value": avg_basket.reindex(last_date.index).values | |
| }).fillna({"avg_basket_value": np.nan}) | |
| vip = rfm.sort_values(["gp","orders","revenue"], ascending=[False, False, False]).head(20) | |
| if len(rfm): | |
| gp_q3 = rfm["gp"].quantile(0.75) | |
| at_risk = rfm[(rfm["gp"] >= gp_q3) & (rfm["recency_days"] > 30)].sort_values(["gp","recency_days"], ascending=[False, False]).head(20) | |
| else: | |
| at_risk = rfm.head(0) | |
| new_customers = rfm[(rfm["orders"] == 1) & (rfm["recency_days"] <= 7)].sort_values("gp", ascending=False).head(20) | |
| out = { | |
| "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20}, | |
| "leaderboards": { | |
| "top_customers_by_gp": self._rfm_to_list(vip), | |
| "at_risk": self._rfm_to_list(at_risk), | |
| "new_customers": self._rfm_to_list(new_customers) | |
| }, | |
| "rfm_summary": { | |
| "unique_customers": int(rfm["customer"].nunique()), | |
| "median_recency_days": float(rfm["recency_days"].median()) if len(rfm) else None, | |
| "median_orders": float(rfm["orders"].median()) if len(rfm) else None, | |
| "median_gp": float(rfm["gp"].median()) if len(rfm) else None | |
| } | |
| } | |
| emit_kpi_debug(self.profile_id, "rfm_done", {"customers": int(rfm["customer"].nunique())}) | |
| return json_safe(out) | |
| # ------------------------- inventory & cash ------------------------- | |
| def _inventory_block(self, cur_df: pd.DataFrame, product_agg: pd.DataFrame, current_bounds: Tuple[pd.Timestamp, pd.Timestamp]) -> Dict[str, Any]: | |
| if self.stock_feed.empty: | |
| return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}} | |
| start_cur, end_cur = current_bounds | |
| days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0) | |
| pa = (product_agg or pd.DataFrame()).copy() | |
| if pa.empty: | |
| return {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}} | |
| pa["units_per_day"] = pa["units"] / days | |
| sf = self.stock_feed.copy() | |
| sf["product_key"] = sf.get("product", sf.get("Product", "")).astype(str).str.strip() | |
| pa["product_key"] = pa[self.COL_PRODUCT].astype(str).str.strip() | |
| merged = pa.merge(sf, on="product_key", how="right", suffixes=("", "_stock")) | |
| merged["units_per_day"] = merged["units_per_day"].fillna(0.0) | |
| merged["stock_on_hand"] = pd.to_numeric(merged.get("stock_on_hand", np.nan), errors="coerce") | |
| merged["reorder_point"] = pd.to_numeric(merged.get("reorder_point", np.nan), errors="coerce") | |
| merged["lead_time_days"] = pd.to_numeric(merged.get("lead_time_days", np.nan), errors="coerce") | |
| merged["days_of_cover"] = np.where(merged["units_per_day"] > 0, merged["stock_on_hand"] / merged["units_per_day"], np.inf) | |
| def status_row(r): | |
| if pd.isna(r.get("stock_on_hand")): | |
| return "unknown" | |
| if (r["stock_on_hand"] or 0) <= 0: | |
| return "stockout" | |
| if pd.notna(r.get("reorder_point")) and r["stock_on_hand"] <= r["reorder_point"]: | |
| return "low" | |
| if np.isfinite(r["days_of_cover"]) and pd.notna(r.get("lead_time_days")) and r["days_of_cover"] < r["lead_time_days"]: | |
| return "stockout_risk" | |
| if r["units_per_day"] == 0 and (r["stock_on_hand"] or 0) > 0: | |
| return "dead_stock" | |
| return "ok" | |
| merged["status"] = merged.apply(status_row, axis=1) | |
| products_out, low_stock, stockout_risk, dead_stock = [], [], [], [] | |
| for _, r in merged.iterrows(): | |
| rec = { | |
| "product": str(r.get(self.COL_PRODUCT) or r.get("product_key")), | |
| "stock_on_hand": float(r["stock_on_hand"]) if pd.notna(r["stock_on_hand"]) else None, | |
| "reorder_point": float(r["reorder_point"]) if pd.notna(r["reorder_point"]) else None, | |
| "lead_time_days": float(r["lead_time_days"]) if pd.notna(r["lead_time_days"]) else None, | |
| "days_of_cover": float(r["days_of_cover"]) if np.isfinite(r["days_of_cover"]) else None, | |
| "daily_sales_velocity": float(r["units_per_day"]), | |
| "status": str(r["status"]) | |
| } | |
| products_out.append(rec) | |
| if rec["status"] == "low": | |
| low_stock.append(rec["product"]) | |
| elif rec["status"] == "stockout_risk": | |
| stockout_risk.append(rec["product"]) | |
| elif rec["status"] == "dead_stock": | |
| dead_stock.append(rec["product"]) | |
| return { | |
| "stock_snapshot_asof": now_harare().isoformat(), | |
| "products": products_out, | |
| "alerts": { | |
| "low_stock": sorted(set(low_stock)), | |
| "stockout_risk": sorted(set(stockout_risk)), | |
| "dead_stock": sorted(set(dead_stock)) | |
| } | |
| } | |
| def _cash_recon_block(self, cur_df: pd.DataFrame) -> Dict[str, Any]: | |
| if self.cash_float_feed.empty: | |
| return {"status": "no_cash_data"} | |
| cf = self.cash_float_feed.copy() | |
| out_days = [] | |
| high_var_days = 0 | |
| # Compute cash sales per branch/date from cur_df | |
| if cur_df.empty: | |
| cash_sales = pd.DataFrame(columns=["branch","date","cash_sales"]) | |
| else: | |
| df = cur_df.copy() | |
| df["date"] = df[self.COL_DT].dt.strftime("%Y-%m-%d") | |
| df["is_cash"] = (df.get("Money_Type","").astype(str).str.lower() == "cash") | |
| cash_sales = df[df["is_cash"]].groupby([self.COL_BRANCH,"date"])[self.COL_REVENUE].sum().reset_index() | |
| cash_sales = cash_sales.rename(columns={self.COL_BRANCH:"branch", self.COL_REVENUE:"cash_sales"}) | |
| cf["date"] = cf["date"].astype(str).str[:10] | |
| merged = cf.merge(cash_sales, on=["branch","date"], how="left") | |
| merged["cash_sales"] = merged["cash_sales"].fillna(0.0) | |
| for _, r in merged.iterrows(): | |
| opening = float(r.get("opening_float") or 0.0) | |
| closing = float(r.get("closing_float") or 0.0) | |
| drops = float(r.get("drops") or 0.0) | |
| petty = float(r.get("petty_cash") or 0.0) | |
| declared = float(r.get("declared_cash") or 0.0) | |
| cash_sales_val = float(r.get("cash_sales") or 0.0) | |
| expected = opening + cash_sales_val - drops - petty - closing | |
| variance = declared - expected | |
| variance_pct = (variance / cash_sales_val) if cash_sales_val > 0 else 0.0 | |
| flag = (abs(variance) >= float(self.params["cash_variance_threshold_abs"])) or \ | |
| (abs(variance_pct) >= float(self.params["cash_variance_threshold_pct"])) | |
| if flag: | |
| high_var_days += 1 | |
| out_days.append({ | |
| "branch": str(r["branch"]), | |
| "date": str(r["date"]), | |
| "cash_sales": round(cash_sales_val, 2), | |
| "declared_cash": round(declared, 2), | |
| "opening_float": round(opening, 2), | |
| "closing_float": round(closing, 2), | |
| "drops": round(drops, 2), | |
| "petty_cash": round(petty, 2), | |
| "expected_cash": round(expected, 2), | |
| "variance": round(variance, 2), | |
| "variance_pct": round(variance_pct, 4), | |
| "flag": bool(flag), | |
| }) | |
| return {"days": out_days, "flags": {"high_variance_days": int(high_var_days)}} | |
| # ------------------------- branch analytics ------------------------- | |
| def _per_branch_blocks(self, cur_df: pd.DataFrame, previous_df: pd.DataFrame, current_bounds: Tuple[pd.Timestamp,pd.Timestamp]) -> Dict[str, Any]: | |
| if cur_df.empty or not self._has(cur_df, self.COL_BRANCH): | |
| return {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}} | |
| per_branch = {} | |
| branches = sorted(map(str, cur_df[self.COL_BRANCH].dropna().unique().tolist())) | |
| start_cur, end_cur = current_bounds | |
| days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0) | |
| branch_summary_rows = [] | |
| for br in branches: | |
| try: | |
| d = cur_df[cur_df[self.COL_BRANCH] == br] | |
| if d.empty: | |
| continue | |
| revenue = float(d[self.COL_REVENUE].sum()) | |
| cogs = float(d[self.COL_COGS].sum()) | |
| gp = float(d[self.COL_GP].sum()) | |
| margin_pct = (gp / revenue) if revenue > 0 else None | |
| tx = int(d[self.COL_INVOICE].nunique()) if self._has(d, self.COL_INVOICE) and d[self.COL_INVOICE].notna().any() else int(len(d)) | |
| items = float(d[self.COL_UNITS].sum()) | |
| basket_df = self._build_basket_table(d) | |
| basket_kpis = self._basket_kpis(basket_df) | |
| temporal = self._temporal_patterns(d) | |
| pagg = self._build_product_aggregates(d) | |
| if not pagg.empty: | |
| pagg["units_per_day"] = pagg["units"] / days | |
| product_lb = self._product_leaderboards(pagg) | |
| else: | |
| product_lb = self._empty_product_leaderboards() | |
| affinity = self._affinity_pairs(d, basket_df) | |
| customers = self._customer_value(d, basket_df) | |
| cash_recon = self._cash_recon_block(d) | |
| per_branch[br] = { | |
| "kpis": { | |
| "revenue": round(revenue, 2), | |
| "cogs": round(cogs, 2), | |
| "gross_profit": round(gp, 2), | |
| "gp_margin_pct": float(round(margin_pct, 4)) if margin_pct is not None else None, | |
| "transactions": tx, | |
| "items_sold": round(items, 2), | |
| "avg_basket_value": basket_kpis.get("median_basket_value"), | |
| "avg_items_per_basket": basket_kpis.get("avg_items_per_basket"), | |
| "avg_gp_per_basket": basket_kpis.get("avg_gross_profit_per_basket"), | |
| }, | |
| "temporal": temporal, | |
| "products": product_lb, | |
| "affinity": affinity, | |
| "customer_value": customers, | |
| "cash_recon": cash_recon, | |
| "data_quality": { | |
| "duplicates_dropped": self._prepared_dupes_dropped, | |
| "non_sale_rows_excluded": self._non_sale_excluded, | |
| "currency_mixed": False | |
| } | |
| } | |
| branch_summary_rows.append({"branch": br, "revenue": revenue, "gp": gp, "gp_margin_pct": margin_pct or 0.0}) | |
| except Exception as e: | |
| emit_kpi_debug(self.profile_id, "branch_block_error", {"branch": br, "error": str(e)}) | |
| cross = {} | |
| if branch_summary_rows: | |
| try: | |
| bs = pd.DataFrame(branch_summary_rows) | |
| cross["rankings"] = { | |
| "by_revenue": bs.sort_values("revenue", ascending=False)[["branch","revenue"]].to_dict(orient="records"), | |
| "by_gp_margin_pct": bs.sort_values("gp_margin_pct", ascending=False)[["branch","gp_margin_pct"]].to_dict(orient="records"), | |
| } | |
| cross["spread"] = { | |
| "gp_margin_pct_max": float(bs["gp_margin_pct"].max()) if len(bs) else None, | |
| "gp_margin_pct_min": float(bs["gp_margin_pct"].min()) if len(bs) else None, | |
| "gap_pct_points": float((bs["gp_margin_pct"].max() - bs["gp_margin_pct"].min())) if len(bs) else None, | |
| } | |
| tot_rev = float(bs["revenue"].sum()) | |
| shares, hhi = [], 0.0 | |
| for _, r in bs.iterrows(): | |
| sh = (r["revenue"] / tot_rev) if tot_rev > 0 else 0.0 | |
| shares.append({"branch": r["branch"], "share": float(round(sh, 6))}) | |
| hhi += sh*sh | |
| cross["concentration"] = {"share_by_branch": shares, "hhi_revenue": float(round(hhi, 6))} | |
| if not previous_df.empty and self._has(previous_df, self.COL_BRANCH): | |
| prev_g = previous_df.groupby(self.COL_BRANCH).agg( | |
| revenue=(self.COL_REVENUE,"sum"), | |
| gp=(self.COL_GP,"sum") | |
| ).reset_index().rename(columns={self.COL_BRANCH:"branch"}) | |
| cur_g = pd.DataFrame(branch_summary_rows) | |
| m = cur_g.merge(prev_g, on="branch", suffixes=("_cur","_prev"), how="left").fillna(0.0) | |
| wow_rows = [] | |
| for _, r in m.iterrows(): | |
| wow_rows.append({ | |
| "branch": r["branch"], | |
| "revenue_wow": float(((r["revenue_cur"] - r["revenue_prev"]) / r["revenue_prev"])*100) if r["revenue_prev"]>0 else (100.0 if r["revenue_cur"]>0 else 0.0), | |
| "gp_wow": float(((r["gp_cur"] - r["gp_prev"]) / r["gp_prev"])*100) if r["gp_prev"]>0 else (100.0 if r["gp_cur"]>0 else 0.0), | |
| "avg_basket_wow": None | |
| }) | |
| cross["trend_wow"] = wow_rows | |
| except Exception as e: | |
| emit_kpi_debug(self.profile_id, "branch_cross_error", {"error": str(e)}) | |
| return {"params": self._branch_params(), "per_branch": per_branch, "cross_branch": cross} | |
| def _branch_params(self) -> Dict[str, Any]: | |
| return { | |
| "top_k": int(self.params["top_k"]), | |
| "min_support_baskets": int(self.params["min_support_baskets"]), | |
| "min_lift": float(self.params["min_lift"]), | |
| "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]), | |
| "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]), | |
| } | |
| # ------------------------- product leaderboards & concentration ------------------------- | |
| def _product_leaderboards(self, g: pd.DataFrame) -> Dict[str, Any]: | |
| top_k = int(self.params["top_k"]) | |
| g_marginpct = g.copy() | |
| g_marginpct = g_marginpct[ | |
| (g_marginpct["revenue"] >= float(self.params["min_revenue_for_margin_pct"])) & | |
| (g_marginpct["tx_count"] >= int(self.params["min_tx_for_margin_pct"])) | |
| ] | |
| def top(df, col, asc=False): | |
| if df.empty: | |
| return [] | |
| d = df.sort_values(col, ascending=asc).head(top_k) | |
| return [ | |
| { | |
| "product": str(r[self.COL_PRODUCT]), | |
| "revenue": round(float(r["revenue"]), 2), | |
| "units": float(r["units"]), | |
| "gross_profit": round(float(r["gross_profit"]), 2), | |
| "margin_pct": float(round(r["margin_pct"], 4)) if pd.notna(r["margin_pct"]) else None, | |
| "tx_count": int(r["tx_count"]), | |
| "avg_selling_price": float(round(r["avg_selling_price"], 4)) if pd.notna(r["avg_selling_price"]) else None, | |
| "avg_unit_cost": float(round(r["avg_unit_cost"], 4)) if pd.notna(r["avg_unit_cost"]) else None, | |
| "units_per_day": float(round(r.get("units_per_day", np.nan), 4)) if pd.notna(r.get("units_per_day", np.nan)) else None, | |
| } for _, r in d.iterrows() | |
| ] | |
| return { | |
| "top_by_revenue": top(g, "revenue", asc=False), | |
| "top_by_units": top(g, "units", asc=False), | |
| "top_by_margin_value": top(g, "gross_profit", asc=False), | |
| "top_by_margin_pct": top(g_marginpct, "margin_pct", asc=False), | |
| "bottom_by_revenue": top(g, "revenue", asc=True), | |
| "loss_makers": top(g[g["gross_profit"] < 0], "gross_profit", asc=True), | |
| "by_velocity": top(g.assign(units_per_day=g.get("units_per_day", np.nan)), "units_per_day", asc=False), | |
| "by_gp_per_unit": top(g.assign(gp_per_unit=np.where(g["units"]>0, g["gross_profit"]/g["units"], np.nan)), "gp_per_unit", asc=False), | |
| } | |
| def _empty_product_leaderboards(self) -> Dict[str, Any]: | |
| return { | |
| "top_by_revenue": [], | |
| "top_by_units": [], | |
| "top_by_margin_value": [], | |
| "top_by_margin_pct": [], | |
| "bottom_by_revenue": [], | |
| "loss_makers": [], | |
| "by_velocity": [], | |
| "by_gp_per_unit": [], | |
| } | |
| def _concentration_block(self, g: pd.DataFrame) -> Dict[str, Any]: | |
| if g.empty: | |
| return { | |
| "revenue_share_top5": 0.0, | |
| "units_share_top5": 0.0, | |
| "revenue_pareto_top20pct_share": 0.0, | |
| "gini_revenue": 0.0 | |
| } | |
| total_rev = float(g["revenue"].sum()) | |
| total_units = float(g["units"].sum()) | |
| rev_sorted = g.sort_values("revenue", ascending=False)["revenue"].values | |
| units_sorted = g.sort_values("units", ascending=False)["units"].values | |
| share_top5_rev = (rev_sorted[:5].sum() / total_rev) if total_rev > 0 else 0.0 | |
| share_top5_units = (units_sorted[:5].sum() / total_units) if total_units > 0 else 0.0 | |
| n = len(rev_sorted) | |
| if n == 0: | |
| pareto = 0.0 | |
| else: | |
| k = max(1, int(np.ceil(0.2 * n))) | |
| pareto = rev_sorted[:k].sum() / total_rev if total_rev > 0 else 0.0 | |
| if total_rev <= 0 or n == 0: | |
| gini = 0.0 | |
| else: | |
| x = np.sort(rev_sorted) # ascending | |
| cum = np.cumsum(x) | |
| gini = 1.0 - 2.0 * np.sum(cum) / (n * np.sum(x)) + 1.0 / n | |
| return { | |
| "revenue_share_top5": float(round(share_top5_rev, 6)), | |
| "units_share_top5": float(round(share_top5_units, 6)), | |
| "revenue_pareto_top20pct_share": float(round(pareto, 6)), | |
| "gini_revenue": float(round(gini, 6)) | |
| } | |
| # ------------------------- public API ------------------------- | |
| def get_business_intelligence_briefing(self) -> Dict[str, Any]: | |
| if self.df.empty: | |
| emit_kpi_debug(self.profile_id, "briefing", {"status": "no_data"}) | |
| return {"Status": "No sales data available to generate a briefing."} | |
| current_df, previous_df, tfmeta = self._get_comparison_timeframes() | |
| if current_df.empty: | |
| emit_kpi_debug(self.profile_id, "briefing", {"status": "no_current_period_data", **tfmeta}) | |
| return {"Status": f"No sales data for the current period ({tfmeta.get('period_label', 'N/A')}).", "meta": tfmeta} | |
| snapshot = {} | |
| section_errors = {} | |
| # Headline | |
| try: | |
| headline = self._headline(current_df, previous_df) | |
| snapshot["Summary Period"] = tfmeta.get("period_label", "This Week vs. Last Week") | |
| snapshot["Performance Snapshot (vs. Prior Period)"] = { | |
| "Total Revenue": f"{headline['total_revenue_fmt']} ({headline['total_revenue_change']})", | |
| "Gross Profit": f"{headline['gross_profit_fmt']} ({headline['gross_profit_change']})", | |
| "Transactions": f"{headline['transactions_value']} ({headline['transactions_change']})", | |
| } | |
| except Exception as e: | |
| section_errors["headline"] = str(e) | |
| # Basket & affinity | |
| try: | |
| basket_df = self._build_basket_table(current_df) | |
| snapshot["Basket Analysis"] = self._basket_kpis(basket_df) | |
| except Exception as e: | |
| section_errors["basket"] = str(e) | |
| snapshot["Basket Analysis"] = {"avg_items_per_basket": "N/A", "avg_gross_profit_per_basket": "N/A", "median_basket_value": "N/A", "basket_size_distribution": {}, "low_sample": True} | |
| try: | |
| if 'basket_df' in locals(): | |
| snapshot["Product Affinity"] = self._affinity_pairs(current_df, basket_df) | |
| else: | |
| snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []} | |
| except Exception as e: | |
| section_errors["affinity"] = str(e) | |
| snapshot["Product Affinity"] = {"params": self._affinity_params(), "top_pairs": []} | |
| # Temporal | |
| try: | |
| snapshot["Temporal Patterns"] = self._temporal_patterns(current_df) | |
| except Exception as e: | |
| section_errors["temporal"] = str(e) | |
| snapshot["Temporal Patterns"] = {"best_hour_by_profit": None, "best_day_by_profit": None, "hourly_series": [], "dow_series": [], "profit_heatmap_7x24": []} | |
| # Product aggregates + leaderboards + concentration | |
| try: | |
| start_cur = pd.Timestamp(tfmeta["current_start"]) | |
| end_cur = pd.Timestamp(tfmeta["current_end"]) | |
| days = max(1.0, (end_cur - start_cur).total_seconds() / 86400.0) | |
| g_products = self._build_product_aggregates(current_df) | |
| if not g_products.empty: | |
| g_products["units_per_day"] = g_products["units"] / days | |
| product_lb = self._product_leaderboards(g_products) | |
| concentration = self._concentration_block(g_products) | |
| else: | |
| product_lb = self._empty_product_leaderboards() | |
| concentration = self._concentration_block(pd.DataFrame(columns=["revenue","units"])) | |
| snapshot["Product KPIs"] = {"leaderboards": product_lb, "concentration": concentration} | |
| except Exception as e: | |
| section_errors["products"] = str(e) | |
| snapshot["Product KPIs"] = {"leaderboards": self._empty_product_leaderboards(), "concentration": self._concentration_block(pd.DataFrame(columns=["revenue","units"]))} | |
| # Customer value (RFM) | |
| try: | |
| # basket_df may or may not exist: | |
| bdf = locals().get("basket_df", pd.DataFrame()) | |
| snapshot["Customer Value"] = self._customer_value(current_df, bdf) | |
| except Exception as e: | |
| section_errors["customer_value"] = str(e) | |
| snapshot["Customer Value"] = { | |
| "params": {"rfm_window_days": int(self.params["rfm_window_days"]), "retention_factor": float(self.params["retention_factor"]), "vip_count": 20}, | |
| "leaderboards": {"top_customers_by_gp": [], "at_risk": [], "new_customers": []}, | |
| "rfm_summary": {"unique_customers": 0, "median_recency_days": None, "median_orders": None, "median_gp": None} | |
| } | |
| # Inventory (optional) | |
| try: | |
| g_products_for_inv = locals().get("g_products", pd.DataFrame()) | |
| snapshot["Inventory"] = self._inventory_block(current_df, g_products_for_inv, (start_cur, end_cur)) | |
| except Exception as e: | |
| section_errors["inventory"] = str(e) | |
| snapshot["Inventory"] = {"status": "no_stock_data", "products": [], "alerts": {"low_stock": [], "stockout_risk": [], "dead_stock": []}} | |
| # Branch analytics | |
| try: | |
| snapshot["Branch Analytics"] = self._per_branch_blocks(current_df, previous_df, (start_cur, end_cur)) | |
| except Exception as e: | |
| section_errors["branch"] = str(e) | |
| snapshot["Branch Analytics"] = {"params": self._branch_params(), "per_branch": {}, "cross_branch": {}} | |
| # Meta | |
| snapshot["meta"] = { | |
| "timeframes": tfmeta, | |
| "kpi_params": { | |
| "top_k": int(self.params["top_k"]), | |
| "min_revenue_for_margin_pct": float(self.params["min_revenue_for_margin_pct"]), | |
| "min_tx_for_margin_pct": int(self.params["min_tx_for_margin_pct"]), | |
| "rfm_window_days": int(self.params["rfm_window_days"]), | |
| "retention_factor": float(self.params["retention_factor"]), | |
| "min_support_baskets": int(self.params["min_support_baskets"]), | |
| "min_lift": float(self.params["min_lift"]), | |
| "blocked_products": list(self.params["blocked_products"]), | |
| "cash_variance_threshold_abs": float(self.params["cash_variance_threshold_abs"]), | |
| "cash_variance_threshold_pct": float(self.params["cash_variance_threshold_pct"]), | |
| }, | |
| "row_counts": { | |
| "input": int(len(self.raw)), | |
| "prepared": int(len(self.df)), | |
| "current_period": int(len(current_df)), | |
| "previous_period": int(len(previous_df)), | |
| }, | |
| "notes": [ | |
| "Non-sales transaction types excluded (e.g., Transaction_Type_ID != 21).", | |
| f"Duplicates dropped: {getattr(self, '_prepared_dupes_dropped', 0)}", | |
| ], | |
| "section_errors": section_errors, # surfaced to the client for your debug panel | |
| } | |
| emit_kpi_debug(self.profile_id, "briefing_done", snapshot["meta"]) | |
| return json_safe(snapshot) | |
| def synthesize_fallback_response(self, briefing: dict, user_question: str) -> str: | |
| """ | |
| LLM is narration-only. Do NOT invent or recompute numbers here. | |
| Use the already-deterministic 'briefing' dict as the single source of truth. | |
| Safe for PandasAI exception fallback. | |
| """ | |
| try: | |
| prompt = ( | |
| "You are Iris, a concise business analyst.\n" | |
| "IMPORTANT RULES:\n" | |
| "• DO NOT invent numbers. ONLY use values present in the Business Data JSON.\n" | |
| "• If a metric is missing, say 'N/A' or 'no data for this period'.\n" | |
| "• Harare timezone is the reference for dates/times.\n" | |
| "• Keep it brief: headings + bullets; no tables unless clearly helpful.\n" | |
| "• If the user asked something specific (e.g., 'top 5 products'), answer directly from the JSON.\n" | |
| "• Otherwise, provide a short business briefing (performance, products, timing, customers, branches).\n" | |
| "• Never expose internal keys; paraphrase labels.\n" | |
| f"User Question: {json.dumps(user_question)}\n\n" | |
| "Business Data (authoritative; JSON):\n" | |
| f"{json.dumps(json_safe(briefing), ensure_ascii=False)}\n" | |
| ) | |
| resp = self.llm.invoke(prompt) | |
| text = getattr(resp, "content", None) or str(resp) | |
| return sanitize_answer(text) | |
| except Exception as e: | |
| fallback = { | |
| "note": "Narrative fallback failed; returning raw snapshot.", | |
| "error": str(e)[:200], | |
| "snapshot_keys": list(briefing.keys()) if isinstance(briefing, dict) else "N/A" | |
| } | |
| return "### Business Snapshot\n\n```\n" + json.dumps(json_safe(briefing), indent=2) + "\n```\n\n" + \ | |
| "```\n" + json.dumps(fallback, indent=2) + "\n```" | |
| # ------------------------- helpers (outside class) ------------------------- | |
| def rfm_to_list(df: pd.DataFrame) -> List[Dict[str, Any]]: | |
| out = [] | |
| for _, r in df.iterrows(): | |
| out.append({ | |
| "customer": str(r["customer"]), | |
| "gp": float(round(r["gp"], 2)), | |
| "revenue": float(round(r["revenue"], 2)), | |
| "orders": int(r["orders"]), | |
| "recency_days": float(round(r["recency_days"], 2)) if pd.notna(r["recency_days"]) else None, | |
| "avg_basket_value": float(round(r["avg_basket_value"], 2)) if pd.notna(r["avg_basket_value"]) else None | |
| }) | |
| return out | |
| # ----------------------------------------------------------------------------- | |
| # /chat — PandasAI first, then deterministic fallback | |
| # ----------------------------------------------------------------------------- | |
| def bot(): | |
| request_id = str(uuid.uuid4())[:8] | |
| logger.info(f"[{request_id}] === /chat start ===") | |
| try: | |
| payload = request.get_json() or {} | |
| profile_id = str(payload.get("profile_id") or "").strip() | |
| user_question = (payload.get("user_question") or "").strip() | |
| if not profile_id or not user_question: | |
| return jsonify({"answer": "Missing 'profile_id' or 'user_question'."}) | |
| API_URL = "https://irisplustech.com/public/api/business/profile/user/get-recent-transactions-v2" | |
| try: | |
| resp = requests.post( | |
| API_URL, | |
| data={"profile_id": urllib.parse.quote_plus(profile_id)}, | |
| timeout=30 | |
| ) | |
| resp.raise_for_status() | |
| transactions = (resp.json() or {}).get("transactions") or [] | |
| except Exception as e: | |
| logger.exception(f"[{request_id}] Transaction API error: {e}") | |
| return jsonify({"answer": "I couldn’t reach the transactions service. Please try again shortly."}) | |
| if not transactions: | |
| return jsonify({"answer": "No transaction data was found for this profile."}) | |
| # Tier 1: PandasAI | |
| try: | |
| logger.info(f"[{request_id}] PandasAI attempt …") | |
| df = pd.DataFrame(transactions) | |
| pandas_agent = SmartDataframe(df, config={ | |
| "llm": llm, | |
| "response_parser": FlaskResponse, | |
| "security": "none", | |
| "save_charts_path": user_defined_path, | |
| "save_charts": False, | |
| "enable_cache": False, | |
| "conversational": True, | |
| "enable_logging": False, | |
| "custom_whitelisted_dependencies": [ | |
| "os","io","sys","chr","glob","b64decoder","collections", | |
| "builtins","datetime","timedelta","date", | |
| "pandas","numpy","math","statistics", | |
| "matplotlib","plotly","json","re","warnings" | |
| ], | |
| }) | |
| combined_prompt = f"{guardrails_preamble()}\n\n{temporal_hints(user_question)}\n\nQuestion: {user_question}" | |
| answer = pandas_agent.chat(combined_prompt) | |
| if looks_like_error(answer): | |
| logger.warning(f"[{request_id}] PandasAI invalid answer; fallback.") | |
| raise RuntimeError("PandasAI invalid answer") | |
| if isinstance(answer, pd.DataFrame): | |
| return jsonify({"answer": answer.to_html(), "meta": {"source": "pandasai"}}) | |
| if isinstance(answer, plt.Figure): | |
| buf = io.BytesIO() | |
| answer.savefig(buf, format="png") | |
| data_uri = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode('utf-8')}" | |
| return jsonify({"answer": data_uri, "meta": {"source": "pandasai"}}) | |
| return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "pandasai"}}) | |
| except Exception as e: | |
| logger.exception(f"[{request_id}] Tier 1 failed; moving to analyst.") | |
| engine = IrisReportEngine(profile_id=profile_id, transactions_data=transactions, llm_instance=llm) | |
| briefing = engine.get_business_intelligence_briefing() | |
| fallback_answer = engine.synthesize_fallback_response(briefing, user_question) | |
| return jsonify({"answer": sanitize_answer(fallback_answer), "meta": {"source": "analyst_fallback"}}) | |
| except Exception as e: | |
| logger.exception(f"[{request_id}] Critical unexpected error in /chat: {e}") | |
| return jsonify({"answer": "Something went wrong on our side. Please try again."}) | |
| # ----------------------------------------------------------------------------- | |
| # Report / Marketing / Notify — unchanged logic with safer logging | |
| # ----------------------------------------------------------------------------- | |
| def busines_report(): | |
| logger.info("=== /report ===") | |
| try: | |
| request_json = request.get_json() | |
| json_data = request_json.get("json_data") if request_json else None | |
| prompt = ( | |
| "You are Quantilytix business analyst. Analyze the following data and generate a " | |
| "comprehensive and insightful business report, including appropriate key performance " | |
| "indicators and recommendations. Use markdown formatting and tables where necessary. " | |
| "Only return the report and nothing else.\nDATA:\n" + str(json_data) | |
| ) | |
| response = model.generate_content(prompt) | |
| return jsonify(str(response.text)) | |
| except Exception as e: | |
| logger.exception("Error in /report") | |
| return jsonify({"error": "Failed to generate report.", "details": str(e)}), 500 | |
| def marketing(): | |
| logger.info("=== /marketing ===") | |
| try: | |
| request_json = request.get_json() | |
| json_data = request_json.get("json_data") if request_json else None | |
| prompt = ( | |
| "You are a Quantilytix Marketing Specialist. Analyze the following data and generate " | |
| "a creative, concise marketing strategy. Only return the strategy.\n" + str(json_data) | |
| ) | |
| response = model.generate_content(prompt) | |
| return jsonify(str(response.text)) | |
| except Exception as e: | |
| logger.exception("Error in /marketing") | |
| return jsonify({"error": "Failed to generate marketing strategy.", "details": str(e)}), 500 | |
| def notifications(): | |
| logger.info("=== /notify ===") | |
| try: | |
| request_json = request.get_json() | |
| json_data = request_json.get("json_data") if request_json else None | |
| prompt = ( | |
| "You are a Quantilytix business analyst. Write a very brief analysis and marketing tips " | |
| "using this business data. Output must be suitable for a notification dashboard.\n" + str(json_data) | |
| ) | |
| response = model.generate_content(prompt) | |
| return jsonify(str(response.text)) | |
| except Exception as e: | |
| logger.exception("Error in /notify") | |
| return jsonify({"error": "Failed to generate notification content.", "details": str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # ElevenLabs Voice Briefing Endpoints — history summary separated from KPI | |
| # ----------------------------------------------------------------------------- | |
| def _synthesize_history_summary(call_history: List[dict]) -> str: | |
| """Summarize past call transcripts only — never touches KPI data.""" | |
| if not call_history: | |
| return "• New caller — no prior call history." | |
| history_json = json.dumps(json_safe(call_history), indent=2) | |
| analyst_prompt = ( | |
| "You are an executive assistant preparing a pre-call briefing for a voice AI business analyst named Iris.\n" | |
| "ONLY analyze the user's past call history and identify recurring themes or concerns.\n\n" | |
| "USER'S PAST CALL HISTORY (Transcripts):\n" | |
| f"{history_json}\n\n" | |
| "- Output a few bullet points only. No fluff." | |
| ) | |
| try: | |
| response = model.generate_content(analyst_prompt) | |
| summary = (response.text or "").strip() | |
| logger.info(f"Generated conversation history summary ({len(summary)} chars).") | |
| return summary | |
| except Exception as e: | |
| logger.error(f"Failed to generate history summary: {e}") | |
| return "• Could not summarize prior calls." | |
| def log_call_usage(): | |
| """Logs the transcript of a completed call to Firebase.""" | |
| logger.info("=== /api/log-call-usage ===") | |
| payload = request.get_json() or {} | |
| profile_id = payload.get("profile_id") | |
| transcript = payload.get("transcript") | |
| duration = payload.get("durationSeconds") | |
| if not profile_id or not transcript: | |
| return jsonify({"error": "Missing 'profile_id' or 'transcript'."}), 400 | |
| try: | |
| call_id = f"call_{int(time.time())}" | |
| transcript_ref = db_ref.child(f"transcripts/{profile_id}/{call_id}") | |
| transcript_data = { | |
| "transcript": transcript, | |
| "profileId": profile_id, | |
| "durationSeconds": duration, | |
| "createdAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| } | |
| transcript_ref.set(json_safe(transcript_data)) | |
| logger.info(f"Stored transcript for profile '{profile_id}'.") | |
| return jsonify({"status": "success"}), 200 | |
| except Exception as e: | |
| logger.exception(f"Firebase error storing transcript for '{profile_id}': {e}") | |
| return jsonify({"error": "A server error occurred while storing the transcript."}), 500 | |
| def get_call_briefing(): | |
| """ | |
| Returns: | |
| - memory_summary: bullets summarizing prior call transcripts | |
| - kpi_snapshot: deterministic KPI dict for the live profile (no LLM in numbers) | |
| """ | |
| logger.info("=== /api/call-briefing ===") | |
| profile_id = str((request.get_json() or {}).get("profile_id") or "").strip() | |
| if not profile_id: | |
| return jsonify({"error": "Missing 'profile_id'."}), 400 | |
| try: | |
| # 1) Summarize call history | |
| call_history = [] | |
| try: | |
| transcripts = db_ref.child(f"transcripts/{profile_id}").get() | |
| if transcripts: | |
| call_history = list(transcripts.values()) | |
| logger.info(f"Found {len(call_history)} past transcripts for '{profile_id}'.") | |
| except Exception as e: | |
| logger.warning(f"Could not fetch transcript history for '{profile_id}': {e}") | |
| memory_summary = _synthesize_history_summary(call_history) | |
| # 2) KPI Snapshot from live transactions | |
| kpi_snapshot: Dict[str, Any] = {"Status": "Could not retrieve business data."} | |
| API_URL = "https://irisplustech.com/public/api/business/profile/user/get-recent-transactions-v2" | |
| try: | |
| resp = requests.post( | |
| API_URL, | |
| data={"profile_id": urllib.parse.quote_plus(profile_id)}, | |
| timeout=20 | |
| ) | |
| resp.raise_for_status() | |
| transactions = (resp.json() or {}).get("transactions") or [] | |
| if transactions: | |
| engine = IrisReportEngine(profile_id=profile_id, transactions_data=transactions, llm_instance=llm) | |
| kpi_snapshot = engine.get_business_intelligence_briefing() | |
| else: | |
| kpi_snapshot = {"Status": "No transaction data found for this profile."} | |
| except Exception as e: | |
| logger.warning(f"Txn fetch failed for briefing '{profile_id}': {e}") | |
| return jsonify({ | |
| "memory_summary": memory_summary, | |
| "kpi_snapshot": json_safe(kpi_snapshot) | |
| }), 200 | |
| except Exception as e: | |
| logger.exception(f"Critical error in call-briefing for '{profile_id}': {e}") | |
| return jsonify({"error": "Failed to generate call briefing."}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # Entrypoint | |
| # ----------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| # Do NOT use debug=True in production; enabling here for dev. | |
| app.run(debug=True, host="0.0.0.0", port=7860) |