# app.py — Brave Retail Insights (Admin-only holistic analytics; Harare-tz deterministic KPIs) # - Base URL hardcoded to delta-api.pricelyst.co.zw # - Admin credentials (email, password) supplied by CLIENT per request (cached per email) # - Deterministic time windows (Harare); explicit start/end on API calls # - KPI engine never uses LLM for numbers (LLM is narration-only fallback) # - JSON-safe snapshot; deep DEBUG logs (optional mirror to Firebase) # - Drop-in Firebase + AI wiring identical in spirit to prior server from __future__ import annotations import os, io, re, json, time, uuid, base64, logging from typing import Any, Dict, List, Optional, Tuple import pandas as pd import numpy as np import matplotlib.pyplot as plt import requests from flask import Flask, request, jsonify from flask_cors import CORS, cross_origin from dotenv import load_dotenv # LLMs from langchain_google_genai import ChatGoogleGenerativeAI import google.generativeai as genai # PandasAI (tier-1 attempt only) from pandasai import SmartDataframe from pandasai.responses.response_parser import ResponseParser # Firebase import firebase_admin from firebase_admin import credentials, db # ----------------------------------------------------------------------------- # Init # ----------------------------------------------------------------------------- load_dotenv() app = Flask(__name__) CORS(app) logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("brave-retail-app") # ----------------------------------------------------------------------------- # Firebase Initialization (drop-in) # ----------------------------------------------------------------------------- try: credentials_json_string = os.environ.get("FIREBASE") if not credentials_json_string: raise ValueError("FIREBASE env var is not set") credentials_json = json.loads(credentials_json_string) firebase_db_url = os.environ.get("Firebase_DB") if not firebase_db_url: raise ValueError("Firebase_DB env var is not set") cred = credentials.Certificate(credentials_json) firebase_admin.initialize_app(cred, {"databaseURL": firebase_db_url}) db_ref = db.reference() logger.info("Firebase Admin SDK initialized.") except Exception as e: logger.fatal(f"FATAL: Firebase init failed: {e}") raise LOG_KPI_TO_FIREBASE = os.getenv("LOG_KPI_TO_FIREBASE", "0") == "1" # ----------------------------------------------------------------------------- # PandasAI ResponseParser (unchanged) # ----------------------------------------------------------------------------- class FlaskResponse(ResponseParser): def __init__(self, context): super().__init__(context) def format_dataframe(self, result): try: return result["value"].to_html() except Exception: return "" def format_plot(self, result): val = result.get("value") if hasattr(val, "savefig"): buf = io.BytesIO() val.savefig(buf, format="png") buf.seek(0) return f"data:image/png;base64,{base64.b64encode(buf.read()).decode('utf-8')}" if isinstance(val, str) and os.path.isfile(os.path.join(val)): with open(os.path.join(val), "rb") as f: return f"data:image/png;base64,{base64.b64encode(f.read()).decode('utf-8')}" return str(val) def format_other(self, result): return str(result.get("value", "")) # ----------------------------------------------------------------------------- # LLM init # ----------------------------------------------------------------------------- logger.info("Initializing models…") gemini_api_key = os.getenv("Gemini") if not gemini_api_key: raise ValueError("Gemini API key is required (env var Gemini).") llm = ChatGoogleGenerativeAI(api_key=gemini_api_key, model="gemini-2.0-flash", temperature=0.1) genai.configure(api_key=gemini_api_key) generation_config = {"temperature": 0.2, "top_p": 0.95, "max_output_tokens": 5000} model = genai.GenerativeModel(model_name="gemini-2.0-flash-lite-001", generation_config=generation_config) logger.info("AI models initialized.") user_defined_path = os.path.join("/exports/charts", str(uuid.uuid4())) logger.info(f"Chart export path set to: {user_defined_path}") # ----------------------------------------------------------------------------- # Admin API client (client-supplied credentials; holistic admin scope) # ----------------------------------------------------------------------------- SC_BASE_URL = os.getenv("SC_BASE_URL", "https://delta-api.pricelyst.co.zw").rstrip("/") class SCAuth: """Caches a requests.Session per admin email; supports bearer or cookie sessions.""" _cache: Dict[str, Dict[str, Any]] = {} @classmethod def invalidate(cls, email: str) -> None: try: entry = cls._cache.pop(email, None) if entry and isinstance(entry.get("session"), requests.Session): entry["session"].close() except Exception: pass @classmethod def _extract_token(cls, js: dict) -> Optional[str]: if not isinstance(js, dict): return None candidates = [ js.get("token"), js.get("access_token"), (js.get("data") or {}).get("token"), (js.get("data") or {}).get("access_token"), (js.get("authorization") or {}).get("token"), (js.get("auth") or {}).get("token"), ] for t in candidates: if isinstance(t, str) and t.strip(): return t.strip() return None @classmethod def login(cls, email: str, password: str) -> Dict[str, Any]: s = requests.Session() s.headers.update({"Accept": "application/json"}) url = f"{SC_BASE_URL}/api/auth/admin/login" resp = s.post(url, json={"email": email, "password": password}, timeout=30) body_text, body_json = "", {} try: body_json = resp.json() or {} except Exception: body_text = (resp.text or "")[:800] token = cls._extract_token(body_json) if token: s.headers.update({"Authorization": f"Bearer {token}"}) entry = {"session": s, "auth": "bearer", "token": token} cls._cache[email] = entry logger.debug("Admin login (bearer) OK") return entry if resp.cookies and (resp.status_code // 100) == 2: entry = {"session": s, "auth": "cookie"} cls._cache[email] = entry logger.debug("Admin login (cookie) OK") return entry snippet = body_text or (str(body_json)[:800]) raise RuntimeError(f"Login did not return a token or cookie session. HTTP {resp.status_code}. Body≈ {snippet}") def sc_request(method: str, path: str, email: str, password: str, *, params: dict = None, json_body: dict = None, timeout: int = 30): """Authenticated request with 401 auto-refresh (once). Logs a compact sample on success.""" if not path.startswith("/"): path = "/" + path url = f"{SC_BASE_URL}{path}" def _do(s: requests.Session): return s.request(method.upper(), url, params=params, json=json_body, timeout=timeout) entry = SCAuth._cache.get(email) if not entry: entry = SCAuth.login(email, password) s = entry["session"] resp = _do(s) if resp.status_code == 401: SCAuth.invalidate(email) entry = SCAuth.login(email, password) s = entry["session"] resp = _do(s) try: resp.raise_for_status() except Exception as e: snippet = (getattr(resp, "text", "") or "")[:800] raise RuntimeError(f"SC request error {method.upper()} {path}: HTTP {resp.status_code} – {snippet}") from e payload: Any try: payload = resp.json() except Exception: payload = resp.text # ---- Compact sample logging for every endpoint ---- sample = None if isinstance(payload, dict): d = payload.get("data", payload) if isinstance(d, dict): # try common array keys for key in ("sales_over_time", "orders", "transactions", "items", "list", "rows", "data"): v = d.get(key) if isinstance(v, list) and v: sample = {key: v[:2]} # first 2 rows break if sample is None: # fallback: first 10 keys sample = {k: ("[list]" if isinstance(v, list) else v) for k, v in list(d.items())[:10]} elif isinstance(d, list): sample = d[:2] elif isinstance(payload, list): sample = payload[:2] else: sample = str(payload)[:300] logger.debug("SAMPLE %s %s -> %s", method.upper(), path, json.dumps(sample, default=str)) return payload # ----------------------------------------------------------------------------- # Timezone & temporal helpers # ----------------------------------------------------------------------------- TZ = os.getenv("APP_TZ", "Africa/Harare") _TZ = TZ # backward-compatible alias def now_harare() -> pd.Timestamp: return pd.Timestamp.now(tz=TZ) def week_bounds_from(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: monday = ts.tz_convert(TZ).normalize() - pd.Timedelta(days=ts.weekday()) sunday = monday + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59) return monday, sunday def this_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: first_this = ts.normalize().replace(day=1) if first_this.month == 12: first_next = first_this.replace(year=first_this.year + 1, month=1) else: first_next = first_this.replace(month=first_this.month + 1) last_this = first_next - pd.Timedelta(seconds=1) return first_this, last_this def period_to_bounds(period: str) -> Tuple[pd.Timestamp, pd.Timestamp, str]: p = (period or "week").strip().lower() now = now_harare() if p == "today": start = now.normalize() end = start + pd.Timedelta(hours=23, minutes=59, seconds=59); lbl = "Today" elif p in ("week", "this_week"): start, end = week_bounds_from(now); lbl = "This Week" elif p in ("month", "this_month"): start, end = this_month_bounds(now); lbl = "This Month" elif p in ("year", "this_year"): start = now.normalize().replace(month=1, day=1, hour=0, minute=0, second=0) end = now.normalize().replace(month=12, day=31, hour=23, minute=59, second=59); lbl = "This Year" else: start, end = week_bounds_from(now); lbl = "This Week" return start, end, lbl def json_safe(obj: Any) -> Any: if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): return float(obj) if isinstance(obj, (np.bool_,)): return bool(obj) if isinstance(obj, (pd.Timestamp,)): return obj.isoformat() if isinstance(obj, (pd.Series,)): return obj.to_dict() if isinstance(obj, (pd.Index,)): return [json_safe(x) for x in obj.tolist()] if isinstance(obj, (dict,)): return {k: json_safe(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [json_safe(x) for x in obj] return obj def emit_kpi_debug(profile_key: str, stage: str, payload: Dict[str, Any]) -> None: try: obj = {"profile": profile_key, "stage": stage, "payload": payload} logger.debug("KPI_DEBUG %s", json.dumps(json_safe(obj))) if LOG_KPI_TO_FIREBASE: ts = int(time.time()) db_ref.child(f"kpi_debug/{profile_key}/{stage}_{ts}").set(json_safe(payload)) except Exception as e: logger.warning(f"Failed to emit KPI debug logs: {e}") # ----------------------------------------------------------------------------- # Error detection & sanitization # ----------------------------------------------------------------------------- ERROR_PATTERNS = ["traceback","exception","keyerror","nameerror","syntaxerror","modulenotfounderror","importerror","execution failed","attributeerror","valueerror:"] def _stringify(obj) -> str: try: if isinstance(obj, (pd.DataFrame, plt.Figure)): return "" if isinstance(obj, (bytes, bytearray)): return obj.decode("utf-8", errors="ignore") return str(obj) except Exception: return "" def _extract_text_like(ans): if isinstance(ans, dict): if "value" in ans: return _stringify(ans["value"]) for k in ("message","text","content"): if k in ans: return _stringify(ans[k]) return _stringify(ans) if hasattr(ans, "value"): try: return _stringify(getattr(ans, "value")) except Exception: pass return _stringify(ans) def looks_like_error(ans) -> bool: if isinstance(ans, (pd.DataFrame, plt.Figure)): return False s = _extract_text_like(ans).strip().lower() if not s: return True if any(p in s for p in ERROR_PATTERNS): return True if (" file " in s and " line " in s and "error" in s): return True return False def sanitize_answer(ans) -> str: s = _extract_text_like(ans) s = re.sub(r"```+\w*", "", s or "") tb = "Traceback (most recent call last):" if tb in s: s = s.split(tb, 1)[0] return (s or "").strip() # ----------------------------------------------------------------------------- # Robust normalizers # ----------------------------------------------------------------------------- def _to_list(x: Any) -> List[Any]: if x is None: return [] if isinstance(x, list): return x if isinstance(x, dict): return [x] if isinstance(x, str): try: j = json.loads(x) if isinstance(j, list): return j if isinstance(j, dict): return [j] except Exception: return [x] return [x] def _to_float(x: Any) -> Optional[float]: try: if x is None or (isinstance(x, str) and not x.strip()): return None return float(str(x).replace(",", "").strip()) except Exception: return None def _to_int(x: Any) -> Optional[int]: try: f = _to_float(x) return int(f) if f is not None else None except Exception: return None def _coerce_date(s: Any) -> Optional[pd.Timestamp]: if s is None: return None try: dt = pd.to_datetime(s, errors="coerce") if pd.isna(dt): return None try: return dt.tz_localize(TZ, nonexistent="shift_forward", ambiguous="NaT") except Exception: return dt.tz_convert(TZ) except Exception: return None # ----------------------------------------------------------------------------- # Admin raw transactions extractor (row-level for PandasAI) + sample logging # ----------------------------------------------------------------------------- def _paginate(sc_get, email, password, path, params=None, page_param="page", per_page=200, max_pages=50): """Generic paginator for endpoints with page/per_page/meta""" params = dict(params or {}) params.setdefault(page_param, 1) params.setdefault("per_page", per_page) page = 1 for _ in range(max_pages): params[page_param] = page raw = sc_get("GET", path, email, password, params=params) yield raw try: meta = (raw or {}).get("meta") or {} last_page = int(meta.get("last_page") or 0) cur = int(meta.get("current_page") or page) if last_page and cur >= last_page: break if not last_page and not raw: break except Exception: break page += 1 def _normalize_line(order, item, tz=TZ) -> dict: g = lambda o, *ks, default=None: next((o[k] for k in ks if isinstance(o, dict) and k in o), default) to_f = lambda x: _to_float(x) or 0.0 to_i = lambda x: _to_int(x) or 0 order_id = g(order, "id", "order_id", "uuid", "reference") created_at = g(order, "created_at", "date", "ordered_at", "timestamp") customer = g(order, "customer_name", "customer", "buyer_name", "customer_reference") payment = g(order, "payment_method", "payment", "money_type") branch = g(order, "shop_name", "shop", "branch", "store") status = g(order, "status") currency = g(order, "currency") prod_id = g(item, "product_id", "item_id", "sku_id", "id") prod_name = g(item, "product_name", "name", "title", "sku") qty = to_i(g(item, "quantity", "qty", "units")) unit_price = to_f(g(item, "unit_price", "price", "unitPrice")) line_total = to_f(g(item, "line_total", "total", "amount", "revenue")) cost_price = _to_float(g(item, "unit_cost", "cost_price", "cost")) # optional dt = _coerce_date(created_at) revenue = line_total if line_total else (qty * unit_price) gp = None if cost_price is not None: gp = float(revenue - qty * (cost_price or 0.0)) return { "order_id": order_id, "datetime": dt, "date": dt.tz_convert(tz).date().isoformat() if dt is not None else None, "customer": customer, "payment_method": payment, "branch": branch, "status": status, "currency": currency, "product_id": prod_id, "product": prod_name, "quantity": qty, "unit_price": unit_price, "line_total": revenue, "unit_cost": float(cost_price) if cost_price is not None else None, "gross_profit": float(gp) if gp is not None else None, } def fetch_transactions_df(email: str, password: str, t_start: pd.Timestamp, t_end: pd.Timestamp) -> pd.DataFrame: """ Pull row-level order lines. Tries multiple likely endpoints, logs a sample for each, flattens nested items, returns a clean DataFrame suitable for PandasAI. """ CANDIDATES: Tuple[Tuple[str, str, str], ...] = ( ("/api/analytics/orders", "orders", "items"), ("/api/orders", "data", "items"), # many APIs wrap orders under "data" ("/api/analytics/transactions", "transactions", "items"), ("/api/sales/transactions", "transactions", "lines"), ) params = { "start_date": t_start.strftime("%Y-%m-%d"), "end_date": t_end.strftime("%Y-%m-%d"), "include": "items", "per_page": 200, } rows: List[dict] = [] for path, orders_key, items_key in CANDIDATES: try: # Non-paginated attempt raw = sc_request("GET", path, email, password, params=params) # Log a sharper sample for this endpoint (top-level) logger.debug("TXN_PROBE_RAW %s -> keys=%s", path, list(raw.keys())[:10] if isinstance(raw, dict) else type(raw)) payload = raw.get("data") if isinstance(raw, dict) and isinstance(raw.get("data"), (dict, list)) else raw orders = payload.get(orders_key) if isinstance(payload, dict) else payload if orders: orders_list = _to_list(orders) if orders_list: # sample one order + items o0 = orders_list[0] if isinstance(orders_list[0], dict) else {} i0 = _to_list((o0 or {}).get(items_key)) logger.debug("TXN_SAMPLE %s -> order_keys=%s; first_item_keys=%s", path, list(o0.keys())[:15] if isinstance(o0, dict) else type(o0), (list(i0[0].keys())[:15] if i0 and isinstance(i0[0], dict) else "N/A")) for o in orders_list: for it in _to_list((o or {}).get(items_key)): if isinstance(o, dict) and isinstance(it, dict): rows.append(_normalize_line(o, it)) if rows: break # Try paginated shape collected = 0 for page_raw in _paginate(sc_request, email, password, path, params=params): logger.debug("TXN_PAGE %s meta=%s", path, (page_raw or {}).get("meta") if isinstance(page_raw, dict) else "N/A") page_data = page_raw.get("data") if isinstance(page_raw, dict) and isinstance(page_raw.get("data"), (dict, list)) else page_raw page_orders = page_data.get(orders_key) if isinstance(page_data, dict) else page_data for o in _to_list(page_orders): for it in _to_list((o or {}).get(items_key)): if isinstance(o, dict) and isinstance(it, dict): rows.append(_normalize_line(o, it)) collected += 1 if collected and collected >= 5000: # safety cap break if rows: # Log a compact sample of flattened rows logger.debug("TXN_FLAT_SAMPLE %s -> %s", path, json.dumps(rows[:2], default=str)) break except Exception as e: logger.debug(f"fetch_transactions_df: {path} probe failed: {e}") if not rows: logger.warning("No row-level endpoint found; returning an empty transactions frame (schema only).") schema = { "datetime": pd.Series(dtype="datetime64[ns]"), "date": pd.Series(dtype="object"), "order_id": pd.Series(dtype="object"), "status": pd.Series(dtype="object"), "customer": pd.Series(dtype="object"), "branch": pd.Series(dtype="object"), "payment_method": pd.Series(dtype="object"), "currency": pd.Series(dtype="object"), "product_id": pd.Series(dtype="object"), "product": pd.Series(dtype="object"), "quantity": pd.Series(dtype="float"), "unit_price": pd.Series(dtype="float"), "line_total": pd.Series(dtype="float"), "unit_cost": pd.Series(dtype="float"), "gross_profit": pd.Series(dtype="float"), } return pd.DataFrame(schema) df = pd.DataFrame(rows) df["datetime"] = pd.to_datetime(df["datetime"], errors="coerce") try: # Keep tz-naive for some plotting libs but deterministic in Harare df["datetime"] = df["datetime"].dt.tz_convert(TZ).dt.tz_localize(None) except Exception: pass for c in ("quantity", "unit_price", "line_total", "unit_cost", "gross_profit"): if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce") cols = [ "datetime", "date", "order_id", "status", "customer", "branch", "payment_method", "currency", "product_id", "product", "quantity", "unit_price", "line_total", "unit_cost", "gross_profit", ] df = df[[c for c in cols if c in df.columns]] logger.debug("TXN_DF_COLUMNS %s", df.columns.tolist()) logger.debug("TXN_DF_HEAD %s", json.dumps(df.head(3).to_dict(orient="records"), default=str)) return df # ----------------------------------------------------------------------------- # Admin KPI Engine (holistic view) — logs sample after each endpoint # ----------------------------------------------------------------------------- class AdminAnalyticsEngine: """Single-tenant holistic admin analytics. No shop/brand filters; admin sees entire dataset.""" def __init__(self, tenant_key: str, email: str, password: str, period: str = "week"): self.tenant_key = (tenant_key or "admin").strip() self.email = (email or "").strip() self.password = (password or "").strip() self.period = (period or "week").lower().strip() self.t_start, self.t_end, self.period_label = period_to_bounds(self.period) @staticmethod def _unwrap_data(payload: dict) -> dict: if isinstance(payload, dict): return payload.get("data") if isinstance(payload.get("data"), dict) else payload return {} def _dashboard(self) -> dict: raw = sc_request("GET", "/api/analytics/dashboard", self.email, self.password, params={"period": self.period}) data = self._unwrap_data(raw) emit_kpi_debug(self.tenant_key, "dashboard", data or raw or {}) # Log a friendly sample view: logger.debug("SAMPLE /api/analytics/dashboard -> %s", json.dumps({k: data.get(k) for k in list(data.keys())[:10]}, default=str)) return data or {} def _sales_series(self) -> pd.DataFrame: params = { "start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d"), "group_by": "day", } raw = sc_request("GET", "/api/analytics/sales", self.email, self.password, params=params) data = {} if isinstance(raw, dict): data = (raw.get("data") or raw) if isinstance(raw.get("data"), (dict, list)) else raw else: try: j = json.loads(raw) data = j.get("data", j) if isinstance(j, dict) else {} except Exception: data = {} # log samples from top-level keys we expect try: so = data.get("sales_over_time") pm = data.get("sales_by_payment_method") cat = data.get("sales_by_category") logger.debug("SAMPLE /api/analytics/sales -> sales_over_time[:2]=%s; sales_by_payment_method[:2]=%s; sales_by_category[:2]=%s", json.dumps((so or [])[:2]), json.dumps((pm or [])[:2]), json.dumps((cat or [])[:2])) except Exception: pass series = [] for r in _to_list(data.get("sales_over_time")): if not isinstance(r, dict): continue date_str = r.get("date") or r.get("day") or r.get("period") dt = _coerce_date(date_str) if dt is None: continue total_sales = _to_float(r.get("total_sales") or r.get("total") or r.get("revenue")) total_orders = _to_int(r.get("total_orders") or r.get("orders") or r.get("count")) aov = _to_float(r.get("average_order_value") or r.get("aov")) if aov is None and total_sales is not None and (total_orders or 0) > 0: aov = float(total_sales) / int(total_orders) series.append({ "_date": dt, "total_sales": float(total_sales) if total_sales is not None else 0.0, "total_orders": int(total_orders) if total_orders is not None else 0, "aov": float(aov) if aov is not None else None, }) df = pd.DataFrame(series) if df.empty: return pd.DataFrame(columns=["_date", "total_sales", "total_orders", "aov"]) df = df.sort_values("_date").reset_index(drop=True) emit_kpi_debug(self.tenant_key, "sales_series_raw", (raw if isinstance(raw, dict) else {"raw": raw})) logger.debug("SAMPLE sales_series_df.head -> %s", json.dumps(df.head(3).to_dict(orient="records"), default=str)) return df def transactions_df(self) -> pd.DataFrame: df = fetch_transactions_df(self.email, self.password, self.t_start, self.t_end) emit_kpi_debug(self.tenant_key, "transactions_df_meta", { "rows": int(len(df)), "cols": list(df.columns), "period": {"start": self.t_start.isoformat(), "end": self.t_end.isoformat()} }) # already logged columns + head in fetch_transactions_df() return df def _products(self) -> dict: raw = sc_request( "GET", "/api/analytics/products", self.email, self.password, params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")} ) data = self._unwrap_data(raw) emit_kpi_debug(self.tenant_key, "products", data or raw or {}) # log sample leaderboards if present keys = ["top_by_revenue","top_by_units","top_by_margin_value","top_by_margin_pct","bottom_by_revenue","loss_makers"] sample = {k: (data.get(k) or [])[:2] for k in keys if isinstance(data.get(k), list)} logger.debug("SAMPLE /api/analytics/products -> %s", json.dumps(sample)) return data or {} def _customers(self) -> dict: raw = sc_request( "GET", "/api/analytics/customers", self.email, self.password, params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")} ) data = self._unwrap_data(raw) emit_kpi_debug(self.tenant_key, "customers", data or raw or {}) # sample common shapes sample = { "top_customers_by_gp": (data.get("top_customers_by_gp") or [])[:2], "at_risk": (data.get("at_risk") or [])[:2], "new_customers": (data.get("new_customers") or [])[:2], "summary": data.get("summary"), } logger.debug("SAMPLE /api/analytics/customers -> %s", json.dumps(sample)) return data or {} def _inventory(self) -> dict: raw = sc_request("GET", "/api/analytics/inventory", self.email, self.password) data = self._unwrap_data(raw) emit_kpi_debug(self.tenant_key, "inventory", data or raw or {}) try: items = data.get("products") or data.get("items") or data.get("snapshot") or [] logger.debug("SAMPLE /api/analytics/inventory -> %s", json.dumps((items or [])[:2], default=str)) except Exception: pass return data or {} def _comparisons(self) -> dict: raw = sc_request( "GET", "/api/analytics/comparisons", self.email, self.password, params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")} ) data = self._unwrap_data(raw) emit_kpi_debug(self.tenant_key, "comparisons", data or raw or {}) try: logger.debug("SAMPLE /api/analytics/comparisons -> keys=%s", list(data.keys())[:15]) except Exception: pass return data or {} # -------------------- deterministic snapshot -------------------- def build_snapshot(self) -> Dict[str, Any]: dash = self._dashboard() sales_df = self._sales_series() prods = self._products() custs = self._customers() inv = self._inventory() comps = self._comparisons() def _get_num(d: dict, *keys, default=0.0): for k in keys: v = d.get(k) if isinstance(v, (int, float, str)): try: return float(v) except Exception: continue return default total_revenue = _get_num(dash, "total_revenue", "revenue", default=0.0) gross_profit = _get_num(dash, "gross_profit", "gp", default=0.0) transactions = int(_get_num(dash, "transactions", "orders", default=0.0)) if (total_revenue == 0.0 or transactions == 0) and isinstance(sales_df, pd.DataFrame) and not sales_df.empty: total_revenue = float(sales_df["total_sales"].sum()) transactions = int(sales_df["total_orders"].sum()) product_lb = { "top_by_revenue": prods.get("top_by_revenue") or prods.get("topRevenue") or [], "top_by_units": prods.get("top_by_units") or prods.get("topUnits") or [], "top_by_margin_value": prods.get("top_by_margin_value") or prods.get("topByGP") or [], "top_by_margin_pct": prods.get("top_by_margin_pct") or [], "bottom_by_revenue": prods.get("bottom_by_revenue") or prods.get("bottomRevenue") or [], "loss_makers": prods.get("loss_makers") or [], } customer_value = { "leaderboards": { "top_customers_by_gp": custs.get("top_customers_by_gp") or custs.get("topByGP") or [], "at_risk": custs.get("at_risk", []), "new_customers": custs.get("new_customers", []), }, "rfm_summary": custs.get("summary", {}), "params": {"window": self.period_label}, } temporal = self._temporal_patterns_from_sales(sales_df) inventory_block = { "status": "ok" if inv else "no_stock_data", "alerts": inv.get("alerts") if isinstance(inv, dict) else {}, "snapshot": inv, } snapshot = { "Summary Period": f"{self.period_label} ({self.t_start.date()} to {self.t_end.date()})", "Performance Snapshot": { "Total Revenue": round(total_revenue, 2), "Gross Profit": round(gross_profit, 2), "Transactions": transactions, "Change": { "revenue": dash.get("revenue_change") or dash.get("total_revenue_change"), "gross_profit": dash.get("gross_profit_change") or dash.get("gp_change"), "transactions": dash.get("transactions_change") or dash.get("orders_change"), }, }, "Temporal Patterns": temporal, "Product KPIs": {"leaderboards": product_lb}, "Customer Value": customer_value, "Inventory": inventory_block, "Comparisons": comps if isinstance(comps, dict) else {"data": comps}, "meta": { "timeframes": { "current_start": self.t_start.isoformat(), "current_end": self.t_end.isoformat(), "period_label": self.period_label, }, "row_counts": { "sales_points": int(len(sales_df)) if isinstance(sales_df, pd.DataFrame) else 0 }, }, } emit_kpi_debug(self.tenant_key, "snapshot_done", snapshot["meta"]) return json_safe(snapshot) def _temporal_patterns_from_sales(self, df: pd.DataFrame) -> Dict[str, Any]: if df is None or df.empty: return {"series": [], "best_day_by_sales": None} d = df.copy() d["dow"] = d["_date"].dt.day_name() d["date"] = d["_date"].dt.strftime("%Y-%m-%d") g = d.groupby("dow", dropna=False).agg( total_sales=("total_sales", "sum"), total_orders=("total_orders", "sum"), ).reset_index() best_row = None if g.empty else g.loc[g["total_sales"].idxmax()] best_day = None if g.empty else { "day": str(best_row["dow"]), "total_sales": float(best_row["total_sales"]), "total_orders": int(best_row["total_orders"]), } series = d[["date", "total_sales", "total_orders", "aov"]].to_dict(orient="records") return {"series": series, "best_day_by_sales": best_day} def narrate(self, snapshot: dict, user_question: str) -> str: try: prompt = ( "You are a concise business analyst for Brave Retail Insights.\n" "RULES: Do NOT invent numbers; only use values in the JSON. Harare timezone. Keep it brief.\n" f"User Question: {json.dumps(user_question)}\n\n" f"Business Data JSON:\n{json.dumps(json_safe(snapshot), ensure_ascii=False)}\n" ) resp = llm.invoke(prompt) text = getattr(resp, "content", None) or str(resp) return sanitize_answer(text) except Exception: return "### Business Snapshot\n\n```\n" + json.dumps(json_safe(snapshot), indent=2) + "\n```" # ----------------------------------------------------------------------------- # /chat — PandasAI first on sales series, else deterministic snapshot + narration # ----------------------------------------------------------------------------- @app.route("/chat", methods=["POST"]) @cross_origin() def chat(): rid = str(uuid.uuid4())[:8] logger.info(f"[{rid}] === /chat start ===") try: payload = request.get_json() or {} tenant_key = str(payload.get("tenant_key") or "admin") user_question = (payload.get("user_question") or "").strip() period = (payload.get("period") or "week").strip().lower() email = payload.get("email") password = payload.get("password") if not user_question: return jsonify({"answer": "Missing 'user_question'."}) if not email or not password: return jsonify({"error": "Missing 'email' or 'password'."}), 400 engine = AdminAnalyticsEngine(tenant_key, email, password, period) # Build transactions_df now and place it in meta logs (useful for PandasAI later) tdf = engine.transactions_df() # For simple Q&A we still start with sales_df (fast + stable) sales_df = engine._sales_series() if sales_df.empty and tdf.empty: snapshot = engine.build_snapshot() answer = engine.narrate(snapshot, user_question) return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "analyst_fallback"}}) try: logger.info(f"[{rid}] PandasAI attempt …") # If the question references products/items explicitly, switch to transactions_df use_df = tdf if re.search(r"\b(product|sku|item|category|top\s*5|top\s*ten|by\s*revenue|by\s*units)\b", user_question, re.I) and not tdf.empty else sales_df pandas_agent = SmartDataframe(use_df, config={ "llm": llm, "response_parser": FlaskResponse, "security": "none", "save_charts_path": user_defined_path, "save_charts": False, "enable_cache": False, "conversational": True, "enable_logging": False, }) combined_prompt = ( "Rules:\n" "1) Use pd.Timestamp.now(tz='Africa/Harare') for any now().\n" "2) Do NOT assume future dates; only use provided DataFrame columns.\n" "3) For monthly, derive via dt.to_period('M').\n" f"Question: {user_question}" ) answer = pandas_agent.chat(combined_prompt) if looks_like_error(answer): logger.warning(f"[{rid}] PandasAI invalid answer; fallback.") raise RuntimeError("PandasAI invalid answer") if isinstance(answer, pd.DataFrame): return jsonify({"answer": answer.to_html(), "meta": {"source": "pandasai"}}) if isinstance(answer, plt.Figure): buf = io.BytesIO() answer.savefig(buf, format="png") data_uri = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode('utf-8')}" return jsonify({"answer": data_uri, "meta": {"source": "pandasai"}}) return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "pandasai"}}) except Exception: snapshot = engine.build_snapshot() answer = engine.narrate(snapshot, user_question) return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "analyst_fallback"}}) except Exception as e: logger.exception(f"[{rid}] Critical unexpected error in /chat: {e}") return jsonify({"answer": "Something went wrong on our side. Please try again."}) # ----------------------------------------------------------------------------- # /report, /marketing, /notify — feed snapshot (admin holistic) # ----------------------------------------------------------------------------- @app.route("/report", methods=["POST"]) @cross_origin() def report(): logger.info("=== /report ===") try: payload = request.get_json() or {} tenant_key = str(payload.get("tenant_key") or "admin") period = (payload.get("period") or "week").strip().lower() email = payload.get("email"); password = payload.get("password") if not email or not password: return jsonify({"error": "Missing 'email' or 'password'."}), 400 engine = AdminAnalyticsEngine(tenant_key, email, password, period) snapshot = engine.build_snapshot() prompt = ( "You are a Brave Retail Insights business analyst. Analyze the following data and generate a " "succinct, insight-rich admin report with KPIs and recommendations. Use markdown only.\n" + json.dumps(json_safe(snapshot)) ) response = model.generate_content(prompt) return jsonify(str(response.text)) except Exception as e: logger.exception("Error in /report") return jsonify({"error": "Failed to generate report.", "details": str(e)}), 500 @app.route("/marketing", methods=["POST"]) @cross_origin() def marketing(): logger.info("=== /marketing ===") try: payload = request.get_json() or {} tenant_key = str(payload.get("tenant_key") or "admin") period = (payload.get("period") or "week").strip().lower() email = payload.get("email"); password = payload.get("password") if not email or not password: return jsonify({"error": "Missing 'email' or 'password'."}), 400 engine = AdminAnalyticsEngine(tenant_key, email, password, period) snapshot = engine.build_snapshot() prompt = ( "You are a Brave Retail Insights Marketing Specialist. Analyze the JSON and produce a concise, " "practical strategy (audiences, promos, timing). Only return the strategy.\n" + json.dumps(json_safe(snapshot)) ) response = model.generate_content(prompt) return jsonify(str(response.text)) except Exception as e: logger.exception("Error in /marketing") return jsonify({"error": "Failed to generate marketing strategy.", "details": str(e)}), 500 @app.route("/notify", methods=["POST"]) @cross_origin() def notify(): logger.info("=== /notify ===") try: payload = request.get_json() or {} tenant_key = str(payload.get("tenant_key") or "admin") period = (payload.get("period") or "week").strip().lower() email = payload.get("email"); password = payload.get("password") if not email or not password: return jsonify({"error": "Missing 'email' or 'password'."}), 400 engine = AdminAnalyticsEngine(tenant_key, email, password, period) snapshot = engine.build_snapshot() prompt = ( "You are a Brave Retail Insights business analyst. Write up to 6 short bullets with actionable tips " "for an admin notification panel using this JSON.\n" + json.dumps(json_safe(snapshot)) ) response = model.generate_content(prompt) return jsonify(str(response.text)) except Exception as e: logger.exception("Error in /notify") return jsonify({"error": "Failed to generate notification content.", "details": str(e)}), 500 # ----------------------------------------------------------------------------- # Voice briefing endpoints (history in Firebase; KPIs from admin snapshot) # ----------------------------------------------------------------------------- def _synthesize_history_summary(call_history: List[dict]) -> str: if not call_history: return "• New caller — no prior call history." history_json = json.dumps(json_safe(call_history), indent=2) analyst_prompt = ( "You are an executive assistant preparing a pre-call briefing for Brave Retail Insights. " "Only analyze the user's past call history and summarize recurring themes.\n\n" f"{history_json}\n\n- Output a few bullets only." ) try: response = model.generate_content(analyst_prompt) return (response.text or "").strip() or "• (empty)" except Exception: return "• Could not summarize prior calls." @app.route("/api/log-call-usage", methods=["POST"]) @cross_origin() def log_call_usage(): payload = request.get_json() or {} profile_id = payload.get("profile_id") transcript = payload.get("transcript") duration = payload.get("durationSeconds") if not profile_id or not transcript: return jsonify({"error": "Missing 'profile_id' or 'transcript'."}), 400 try: call_id = f"call_{int(time.time())}" ref = db_ref.child(f"transcripts/{profile_id}/{call_id}") ref.set(json_safe({ "transcript": transcript, "profileId": profile_id, "durationSeconds": duration, "createdAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) })) return jsonify({"status": "success"}), 200 except Exception as e: logger.exception(f"Firebase error storing transcript for '{profile_id}': {e}") return jsonify({"error": "Server error while storing the transcript."}), 500 @app.route("/api/call-briefing", methods=["POST"]) @cross_origin() def get_call_briefing(): payload = request.get_json() or {} profile_id = str((payload.get("profile_id") or "").strip()) period = (payload.get("period") or "week").strip().lower() email = payload.get("email") password = payload.get("password") if not profile_id: return jsonify({"error": "Missing 'profile_id'."}), 400 if not email or not password: return jsonify({"error": "Missing 'email' or 'password'."}), 400 try: call_history = [] try: transcripts = db_ref.child(f"transcripts/{profile_id}").get() if transcripts: call_history = list(transcripts.values()) except Exception as e: logger.warning(f"Transcript fetch failed for '{profile_id}': {e}") memory_summary = _synthesize_history_summary(call_history) engine = AdminAnalyticsEngine(profile_id or "admin", email, password, period) kpi_snapshot = engine.build_snapshot() return jsonify({"memory_summary": memory_summary, "kpi_snapshot": json_safe(kpi_snapshot)}), 200 except Exception as e: logger.exception(f"Critical error in call-briefing for '{profile_id}': {e}") return jsonify({"error": "Failed to generate call briefing."}), 500 # ----------------------------------------------------------------------------- # Entrypoint # ----------------------------------------------------------------------------- if __name__ == "__main__": # Do NOT use debug=True in production. app.run(debug=True, host="0.0.0.0", port=7860)