Spaces:
Sleeping
Sleeping
| # app.py — Brave Retail Insights (Admin-only holistic analytics; Harare-tz deterministic KPIs) | |
| # - Base URL hardcoded to delta-api.pricelyst.co.zw | |
| # - Admin credentials (email, password) supplied by CLIENT per request (cached per email) | |
| # - Deterministic time windows (Harare); explicit start/end on API calls | |
| # - KPI engine never uses LLM for numbers (LLM is narration-only fallback) | |
| # - JSON-safe snapshot; deep DEBUG logs (optional mirror to Firebase) | |
| # - Drop-in Firebase + AI wiring identical in spirit to prior server | |
| from __future__ import annotations | |
| import os, io, re, json, time, uuid, base64, logging | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import requests | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS, cross_origin | |
| from dotenv import load_dotenv | |
| # LLMs | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| import google.generativeai as genai | |
| # PandasAI (tier-1 attempt only) | |
| from pandasai import SmartDataframe | |
| from pandasai.responses.response_parser import ResponseParser | |
| # Firebase | |
| import firebase_admin | |
| from firebase_admin import credentials, db | |
| # ----------------------------------------------------------------------------- | |
| # Init | |
| # ----------------------------------------------------------------------------- | |
| load_dotenv() | |
| app = Flask(__name__) | |
| CORS(app) | |
| logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger("brave-retail-app") | |
| # ----------------------------------------------------------------------------- | |
| # Firebase Initialization (drop-in) | |
| # ----------------------------------------------------------------------------- | |
| try: | |
| credentials_json_string = os.environ.get("FIREBASE") | |
| if not credentials_json_string: | |
| raise ValueError("FIREBASE env var is not set") | |
| credentials_json = json.loads(credentials_json_string) | |
| firebase_db_url = os.environ.get("Firebase_DB") | |
| if not firebase_db_url: | |
| raise ValueError("Firebase_DB env var is not set") | |
| cred = credentials.Certificate(credentials_json) | |
| firebase_admin.initialize_app(cred, {"databaseURL": firebase_db_url}) | |
| db_ref = db.reference() | |
| logger.info("Firebase Admin SDK initialized.") | |
| except Exception as e: | |
| logger.fatal(f"FATAL: Firebase init failed: {e}") | |
| raise | |
| LOG_KPI_TO_FIREBASE = os.getenv("LOG_KPI_TO_FIREBASE", "0") == "1" | |
| # ----------------------------------------------------------------------------- | |
| # PandasAI ResponseParser (unchanged) | |
| # ----------------------------------------------------------------------------- | |
| class FlaskResponse(ResponseParser): | |
| def __init__(self, context): | |
| super().__init__(context) | |
| def format_dataframe(self, result): | |
| try: | |
| return result["value"].to_html() | |
| except Exception: | |
| return "" | |
| def format_plot(self, result): | |
| val = result.get("value") | |
| if hasattr(val, "savefig"): | |
| buf = io.BytesIO() | |
| val.savefig(buf, format="png") | |
| buf.seek(0) | |
| return f"data:image/png;base64,{base64.b64encode(buf.read()).decode('utf-8')}" | |
| if isinstance(val, str) and os.path.isfile(os.path.join(val)): | |
| with open(os.path.join(val), "rb") as f: | |
| return f"data:image/png;base64,{base64.b64encode(f.read()).decode('utf-8')}" | |
| return str(val) | |
| def format_other(self, result): | |
| return str(result.get("value", "")) | |
| # ----------------------------------------------------------------------------- | |
| # LLM init | |
| # ----------------------------------------------------------------------------- | |
| logger.info("Initializing models…") | |
| gemini_api_key = os.getenv("Gemini") | |
| if not gemini_api_key: | |
| raise ValueError("Gemini API key is required (env var Gemini).") | |
| llm = ChatGoogleGenerativeAI(api_key=gemini_api_key, model="gemini-2.0-flash", temperature=0.1) | |
| genai.configure(api_key=gemini_api_key) | |
| generation_config = {"temperature": 0.2, "top_p": 0.95, "max_output_tokens": 5000} | |
| model = genai.GenerativeModel(model_name="gemini-2.0-flash-lite-001", generation_config=generation_config) | |
| logger.info("AI models initialized.") | |
| user_defined_path = os.path.join("/exports/charts", str(uuid.uuid4())) | |
| logger.info(f"Chart export path set to: {user_defined_path}") | |
| # ----------------------------------------------------------------------------- | |
| # Admin API client (client-supplied credentials; holistic admin scope) | |
| # ----------------------------------------------------------------------------- | |
| SC_BASE_URL = os.getenv("SC_BASE_URL", "https://delta-api.pricelyst.co.zw").rstrip("/") | |
| class SCAuth: | |
| """Caches a requests.Session per admin email; supports bearer or cookie sessions.""" | |
| _cache: Dict[str, Dict[str, Any]] = {} | |
| def invalidate(cls, email: str) -> None: | |
| try: | |
| entry = cls._cache.pop(email, None) | |
| if entry and isinstance(entry.get("session"), requests.Session): | |
| entry["session"].close() | |
| except Exception: | |
| pass | |
| def _extract_token(cls, js: dict) -> Optional[str]: | |
| if not isinstance(js, dict): | |
| return None | |
| candidates = [ | |
| js.get("token"), | |
| js.get("access_token"), | |
| (js.get("data") or {}).get("token"), | |
| (js.get("data") or {}).get("access_token"), | |
| (js.get("authorization") or {}).get("token"), | |
| (js.get("auth") or {}).get("token"), | |
| ] | |
| for t in candidates: | |
| if isinstance(t, str) and t.strip(): | |
| return t.strip() | |
| return None | |
| def login(cls, email: str, password: str) -> Dict[str, Any]: | |
| s = requests.Session() | |
| s.headers.update({"Accept": "application/json"}) | |
| url = f"{SC_BASE_URL}/api/auth/admin/login" | |
| resp = s.post(url, json={"email": email, "password": password}, timeout=30) | |
| body_text, body_json = "", {} | |
| try: | |
| body_json = resp.json() or {} | |
| except Exception: | |
| body_text = (resp.text or "")[:800] | |
| token = cls._extract_token(body_json) | |
| if token: | |
| s.headers.update({"Authorization": f"Bearer {token}"}) | |
| entry = {"session": s, "auth": "bearer", "token": token} | |
| cls._cache[email] = entry | |
| logger.debug("Admin login (bearer) OK") | |
| return entry | |
| if resp.cookies and (resp.status_code // 100) == 2: | |
| entry = {"session": s, "auth": "cookie"} | |
| cls._cache[email] = entry | |
| logger.debug("Admin login (cookie) OK") | |
| return entry | |
| snippet = body_text or (str(body_json)[:800]) | |
| raise RuntimeError(f"Login did not return a token or cookie session. HTTP {resp.status_code}. Body≈ {snippet}") | |
| def sc_request(method: str, path: str, email: str, password: str, *, | |
| params: dict = None, json_body: dict = None, timeout: int = 30): | |
| """Authenticated request with 401 auto-refresh (once). Logs a compact sample on success.""" | |
| if not path.startswith("/"): | |
| path = "/" + path | |
| url = f"{SC_BASE_URL}{path}" | |
| def _do(s: requests.Session): | |
| return s.request(method.upper(), url, params=params, json=json_body, timeout=timeout) | |
| entry = SCAuth._cache.get(email) | |
| if not entry: | |
| entry = SCAuth.login(email, password) | |
| s = entry["session"] | |
| resp = _do(s) | |
| if resp.status_code == 401: | |
| SCAuth.invalidate(email) | |
| entry = SCAuth.login(email, password) | |
| s = entry["session"] | |
| resp = _do(s) | |
| try: | |
| resp.raise_for_status() | |
| except Exception as e: | |
| snippet = (getattr(resp, "text", "") or "")[:800] | |
| raise RuntimeError(f"SC request error {method.upper()} {path}: HTTP {resp.status_code} – {snippet}") from e | |
| payload: Any | |
| try: | |
| payload = resp.json() | |
| except Exception: | |
| payload = resp.text | |
| # ---- Compact sample logging for every endpoint ---- | |
| sample = None | |
| if isinstance(payload, dict): | |
| d = payload.get("data", payload) | |
| if isinstance(d, dict): | |
| # try common array keys | |
| for key in ("sales_over_time", "orders", "transactions", "items", "list", "rows", "data"): | |
| v = d.get(key) | |
| if isinstance(v, list) and v: | |
| sample = {key: v[:2]} # first 2 rows | |
| break | |
| if sample is None: | |
| # fallback: first 10 keys | |
| sample = {k: ("[list]" if isinstance(v, list) else v) for k, v in list(d.items())[:10]} | |
| elif isinstance(d, list): | |
| sample = d[:2] | |
| elif isinstance(payload, list): | |
| sample = payload[:2] | |
| else: | |
| sample = str(payload)[:300] | |
| logger.debug("SAMPLE %s %s -> %s", method.upper(), path, json.dumps(sample, default=str)) | |
| return payload | |
| # ----------------------------------------------------------------------------- | |
| # Timezone & temporal helpers | |
| # ----------------------------------------------------------------------------- | |
| TZ = os.getenv("APP_TZ", "Africa/Harare") | |
| _TZ = TZ # backward-compatible alias | |
| def now_harare() -> pd.Timestamp: | |
| return pd.Timestamp.now(tz=TZ) | |
| def week_bounds_from(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: | |
| monday = ts.tz_convert(TZ).normalize() - pd.Timedelta(days=ts.weekday()) | |
| sunday = monday + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59) | |
| return monday, sunday | |
| def this_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: | |
| first_this = ts.normalize().replace(day=1) | |
| if first_this.month == 12: | |
| first_next = first_this.replace(year=first_this.year + 1, month=1) | |
| else: | |
| first_next = first_this.replace(month=first_this.month + 1) | |
| last_this = first_next - pd.Timedelta(seconds=1) | |
| return first_this, last_this | |
| def period_to_bounds(period: str) -> Tuple[pd.Timestamp, pd.Timestamp, str]: | |
| p = (period or "week").strip().lower() | |
| now = now_harare() | |
| if p == "today": | |
| start = now.normalize() | |
| end = start + pd.Timedelta(hours=23, minutes=59, seconds=59); lbl = "Today" | |
| elif p in ("week", "this_week"): | |
| start, end = week_bounds_from(now); lbl = "This Week" | |
| elif p in ("month", "this_month"): | |
| start, end = this_month_bounds(now); lbl = "This Month" | |
| elif p in ("year", "this_year"): | |
| start = now.normalize().replace(month=1, day=1, hour=0, minute=0, second=0) | |
| end = now.normalize().replace(month=12, day=31, hour=23, minute=59, second=59); lbl = "This Year" | |
| else: | |
| start, end = week_bounds_from(now); lbl = "This Week" | |
| return start, end, lbl | |
| def json_safe(obj: Any) -> Any: | |
| if isinstance(obj, (np.integer,)): return int(obj) | |
| if isinstance(obj, (np.floating,)): return float(obj) | |
| if isinstance(obj, (np.bool_,)): return bool(obj) | |
| if isinstance(obj, (pd.Timestamp,)): return obj.isoformat() | |
| if isinstance(obj, (pd.Series,)): return obj.to_dict() | |
| if isinstance(obj, (pd.Index,)): return [json_safe(x) for x in obj.tolist()] | |
| if isinstance(obj, (dict,)): return {k: json_safe(v) for k, v in obj.items()} | |
| if isinstance(obj, (list, tuple)): return [json_safe(x) for x in obj] | |
| return obj | |
| def emit_kpi_debug(profile_key: str, stage: str, payload: Dict[str, Any]) -> None: | |
| try: | |
| obj = {"profile": profile_key, "stage": stage, "payload": payload} | |
| logger.debug("KPI_DEBUG %s", json.dumps(json_safe(obj))) | |
| if LOG_KPI_TO_FIREBASE: | |
| ts = int(time.time()) | |
| db_ref.child(f"kpi_debug/{profile_key}/{stage}_{ts}").set(json_safe(payload)) | |
| except Exception as e: | |
| logger.warning(f"Failed to emit KPI debug logs: {e}") | |
| # ----------------------------------------------------------------------------- | |
| # Error detection & sanitization | |
| # ----------------------------------------------------------------------------- | |
| ERROR_PATTERNS = ["traceback","exception","keyerror","nameerror","syntaxerror","modulenotfounderror","importerror","execution failed","attributeerror","valueerror:"] | |
| def _stringify(obj) -> str: | |
| try: | |
| if isinstance(obj, (pd.DataFrame, plt.Figure)): return "" | |
| if isinstance(obj, (bytes, bytearray)): return obj.decode("utf-8", errors="ignore") | |
| return str(obj) | |
| except Exception: | |
| return "" | |
| def _extract_text_like(ans): | |
| if isinstance(ans, dict): | |
| if "value" in ans: return _stringify(ans["value"]) | |
| for k in ("message","text","content"): | |
| if k in ans: return _stringify(ans[k]) | |
| return _stringify(ans) | |
| if hasattr(ans, "value"): | |
| try: return _stringify(getattr(ans, "value")) | |
| except Exception: pass | |
| return _stringify(ans) | |
| def looks_like_error(ans) -> bool: | |
| if isinstance(ans, (pd.DataFrame, plt.Figure)): return False | |
| s = _extract_text_like(ans).strip().lower() | |
| if not s: return True | |
| if any(p in s for p in ERROR_PATTERNS): return True | |
| if (" file " in s and " line " in s and "error" in s): return True | |
| return False | |
| def sanitize_answer(ans) -> str: | |
| s = _extract_text_like(ans) | |
| s = re.sub(r"```+\w*", "", s or "") | |
| tb = "Traceback (most recent call last):" | |
| if tb in s: s = s.split(tb, 1)[0] | |
| return (s or "").strip() | |
| # ----------------------------------------------------------------------------- | |
| # Robust normalizers | |
| # ----------------------------------------------------------------------------- | |
| def _to_list(x: Any) -> List[Any]: | |
| if x is None: return [] | |
| if isinstance(x, list): return x | |
| if isinstance(x, dict): return [x] | |
| if isinstance(x, str): | |
| try: | |
| j = json.loads(x) | |
| if isinstance(j, list): return j | |
| if isinstance(j, dict): return [j] | |
| except Exception: | |
| return [x] | |
| return [x] | |
| def _to_float(x: Any) -> Optional[float]: | |
| try: | |
| if x is None or (isinstance(x, str) and not x.strip()): | |
| return None | |
| return float(str(x).replace(",", "").strip()) | |
| except Exception: | |
| return None | |
| def _to_int(x: Any) -> Optional[int]: | |
| try: | |
| f = _to_float(x) | |
| return int(f) if f is not None else None | |
| except Exception: | |
| return None | |
| def _coerce_date(s: Any) -> Optional[pd.Timestamp]: | |
| if s is None: return None | |
| try: | |
| dt = pd.to_datetime(s, errors="coerce") | |
| if pd.isna(dt): return None | |
| try: | |
| return dt.tz_localize(TZ, nonexistent="shift_forward", ambiguous="NaT") | |
| except Exception: | |
| return dt.tz_convert(TZ) | |
| except Exception: | |
| return None | |
| # ----------------------------------------------------------------------------- | |
| # Admin raw transactions extractor (row-level for PandasAI) + sample logging | |
| # ----------------------------------------------------------------------------- | |
| def _paginate(sc_get, email, password, path, params=None, page_param="page", per_page=200, max_pages=50): | |
| """Generic paginator for endpoints with page/per_page/meta""" | |
| params = dict(params or {}) | |
| params.setdefault(page_param, 1) | |
| params.setdefault("per_page", per_page) | |
| page = 1 | |
| for _ in range(max_pages): | |
| params[page_param] = page | |
| raw = sc_get("GET", path, email, password, params=params) | |
| yield raw | |
| try: | |
| meta = (raw or {}).get("meta") or {} | |
| last_page = int(meta.get("last_page") or 0) | |
| cur = int(meta.get("current_page") or page) | |
| if last_page and cur >= last_page: | |
| break | |
| if not last_page and not raw: | |
| break | |
| except Exception: | |
| break | |
| page += 1 | |
| def _normalize_line(order, item, tz=TZ) -> dict: | |
| g = lambda o, *ks, default=None: next((o[k] for k in ks if isinstance(o, dict) and k in o), default) | |
| to_f = lambda x: _to_float(x) or 0.0 | |
| to_i = lambda x: _to_int(x) or 0 | |
| order_id = g(order, "id", "order_id", "uuid", "reference") | |
| created_at = g(order, "created_at", "date", "ordered_at", "timestamp") | |
| customer = g(order, "customer_name", "customer", "buyer_name", "customer_reference") | |
| payment = g(order, "payment_method", "payment", "money_type") | |
| branch = g(order, "shop_name", "shop", "branch", "store") | |
| status = g(order, "status") | |
| currency = g(order, "currency") | |
| prod_id = g(item, "product_id", "item_id", "sku_id", "id") | |
| prod_name = g(item, "product_name", "name", "title", "sku") | |
| qty = to_i(g(item, "quantity", "qty", "units")) | |
| unit_price = to_f(g(item, "unit_price", "price", "unitPrice")) | |
| line_total = to_f(g(item, "line_total", "total", "amount", "revenue")) | |
| cost_price = _to_float(g(item, "unit_cost", "cost_price", "cost")) # optional | |
| dt = _coerce_date(created_at) | |
| revenue = line_total if line_total else (qty * unit_price) | |
| gp = None | |
| if cost_price is not None: | |
| gp = float(revenue - qty * (cost_price or 0.0)) | |
| return { | |
| "order_id": order_id, | |
| "datetime": dt, | |
| "date": dt.tz_convert(tz).date().isoformat() if dt is not None else None, | |
| "customer": customer, | |
| "payment_method": payment, | |
| "branch": branch, | |
| "status": status, | |
| "currency": currency, | |
| "product_id": prod_id, | |
| "product": prod_name, | |
| "quantity": qty, | |
| "unit_price": unit_price, | |
| "line_total": revenue, | |
| "unit_cost": float(cost_price) if cost_price is not None else None, | |
| "gross_profit": float(gp) if gp is not None else None, | |
| } | |
| def fetch_transactions_df(email: str, password: str, t_start: pd.Timestamp, t_end: pd.Timestamp) -> pd.DataFrame: | |
| """ | |
| Pull row-level order lines. Tries multiple likely endpoints, logs a sample for each, | |
| flattens nested items, returns a clean DataFrame suitable for PandasAI. | |
| """ | |
| CANDIDATES: Tuple[Tuple[str, str, str], ...] = ( | |
| ("/api/analytics/orders", "orders", "items"), | |
| ("/api/orders", "data", "items"), # many APIs wrap orders under "data" | |
| ("/api/analytics/transactions", "transactions", "items"), | |
| ("/api/sales/transactions", "transactions", "lines"), | |
| ) | |
| params = { | |
| "start_date": t_start.strftime("%Y-%m-%d"), | |
| "end_date": t_end.strftime("%Y-%m-%d"), | |
| "include": "items", | |
| "per_page": 200, | |
| } | |
| rows: List[dict] = [] | |
| for path, orders_key, items_key in CANDIDATES: | |
| try: | |
| # Non-paginated attempt | |
| raw = sc_request("GET", path, email, password, params=params) | |
| # Log a sharper sample for this endpoint (top-level) | |
| logger.debug("TXN_PROBE_RAW %s -> keys=%s", path, list(raw.keys())[:10] if isinstance(raw, dict) else type(raw)) | |
| payload = raw.get("data") if isinstance(raw, dict) and isinstance(raw.get("data"), (dict, list)) else raw | |
| orders = payload.get(orders_key) if isinstance(payload, dict) else payload | |
| if orders: | |
| orders_list = _to_list(orders) | |
| if orders_list: | |
| # sample one order + items | |
| o0 = orders_list[0] if isinstance(orders_list[0], dict) else {} | |
| i0 = _to_list((o0 or {}).get(items_key)) | |
| logger.debug("TXN_SAMPLE %s -> order_keys=%s; first_item_keys=%s", | |
| path, | |
| list(o0.keys())[:15] if isinstance(o0, dict) else type(o0), | |
| (list(i0[0].keys())[:15] if i0 and isinstance(i0[0], dict) else "N/A")) | |
| for o in orders_list: | |
| for it in _to_list((o or {}).get(items_key)): | |
| if isinstance(o, dict) and isinstance(it, dict): | |
| rows.append(_normalize_line(o, it)) | |
| if rows: | |
| break | |
| # Try paginated shape | |
| collected = 0 | |
| for page_raw in _paginate(sc_request, email, password, path, params=params): | |
| logger.debug("TXN_PAGE %s meta=%s", path, (page_raw or {}).get("meta") if isinstance(page_raw, dict) else "N/A") | |
| page_data = page_raw.get("data") if isinstance(page_raw, dict) and isinstance(page_raw.get("data"), (dict, list)) else page_raw | |
| page_orders = page_data.get(orders_key) if isinstance(page_data, dict) else page_data | |
| for o in _to_list(page_orders): | |
| for it in _to_list((o or {}).get(items_key)): | |
| if isinstance(o, dict) and isinstance(it, dict): | |
| rows.append(_normalize_line(o, it)) | |
| collected += 1 | |
| if collected and collected >= 5000: # safety cap | |
| break | |
| if rows: | |
| # Log a compact sample of flattened rows | |
| logger.debug("TXN_FLAT_SAMPLE %s -> %s", path, json.dumps(rows[:2], default=str)) | |
| break | |
| except Exception as e: | |
| logger.debug(f"fetch_transactions_df: {path} probe failed: {e}") | |
| if not rows: | |
| logger.warning("No row-level endpoint found; returning an empty transactions frame (schema only).") | |
| schema = { | |
| "datetime": pd.Series(dtype="datetime64[ns]"), | |
| "date": pd.Series(dtype="object"), | |
| "order_id": pd.Series(dtype="object"), | |
| "status": pd.Series(dtype="object"), | |
| "customer": pd.Series(dtype="object"), | |
| "branch": pd.Series(dtype="object"), | |
| "payment_method": pd.Series(dtype="object"), | |
| "currency": pd.Series(dtype="object"), | |
| "product_id": pd.Series(dtype="object"), | |
| "product": pd.Series(dtype="object"), | |
| "quantity": pd.Series(dtype="float"), | |
| "unit_price": pd.Series(dtype="float"), | |
| "line_total": pd.Series(dtype="float"), | |
| "unit_cost": pd.Series(dtype="float"), | |
| "gross_profit": pd.Series(dtype="float"), | |
| } | |
| return pd.DataFrame(schema) | |
| df = pd.DataFrame(rows) | |
| df["datetime"] = pd.to_datetime(df["datetime"], errors="coerce") | |
| try: | |
| # Keep tz-naive for some plotting libs but deterministic in Harare | |
| df["datetime"] = df["datetime"].dt.tz_convert(TZ).dt.tz_localize(None) | |
| except Exception: | |
| pass | |
| for c in ("quantity", "unit_price", "line_total", "unit_cost", "gross_profit"): | |
| if c in df.columns: | |
| df[c] = pd.to_numeric(df[c], errors="coerce") | |
| cols = [ | |
| "datetime", "date", "order_id", "status", "customer", "branch", | |
| "payment_method", "currency", "product_id", "product", | |
| "quantity", "unit_price", "line_total", "unit_cost", "gross_profit", | |
| ] | |
| df = df[[c for c in cols if c in df.columns]] | |
| logger.debug("TXN_DF_COLUMNS %s", df.columns.tolist()) | |
| logger.debug("TXN_DF_HEAD %s", json.dumps(df.head(3).to_dict(orient="records"), default=str)) | |
| return df | |
| # ----------------------------------------------------------------------------- | |
| # Admin KPI Engine (holistic view) — logs sample after each endpoint | |
| # ----------------------------------------------------------------------------- | |
| class AdminAnalyticsEngine: | |
| """Single-tenant holistic admin analytics. No shop/brand filters; admin sees entire dataset.""" | |
| def __init__(self, tenant_key: str, email: str, password: str, period: str = "week"): | |
| self.tenant_key = (tenant_key or "admin").strip() | |
| self.email = (email or "").strip() | |
| self.password = (password or "").strip() | |
| self.period = (period or "week").lower().strip() | |
| self.t_start, self.t_end, self.period_label = period_to_bounds(self.period) | |
| def _unwrap_data(payload: dict) -> dict: | |
| if isinstance(payload, dict): | |
| return payload.get("data") if isinstance(payload.get("data"), dict) else payload | |
| return {} | |
| def _dashboard(self) -> dict: | |
| raw = sc_request("GET", "/api/analytics/dashboard", self.email, self.password, params={"period": self.period}) | |
| data = self._unwrap_data(raw) | |
| emit_kpi_debug(self.tenant_key, "dashboard", data or raw or {}) | |
| # Log a friendly sample view: | |
| logger.debug("SAMPLE /api/analytics/dashboard -> %s", json.dumps({k: data.get(k) for k in list(data.keys())[:10]}, default=str)) | |
| return data or {} | |
| def _sales_series(self) -> pd.DataFrame: | |
| params = { | |
| "start_date": self.t_start.strftime("%Y-%m-%d"), | |
| "end_date": self.t_end.strftime("%Y-%m-%d"), | |
| "group_by": "day", | |
| } | |
| raw = sc_request("GET", "/api/analytics/sales", self.email, self.password, params=params) | |
| data = {} | |
| if isinstance(raw, dict): | |
| data = (raw.get("data") or raw) if isinstance(raw.get("data"), (dict, list)) else raw | |
| else: | |
| try: | |
| j = json.loads(raw) | |
| data = j.get("data", j) if isinstance(j, dict) else {} | |
| except Exception: | |
| data = {} | |
| # log samples from top-level keys we expect | |
| try: | |
| so = data.get("sales_over_time") | |
| pm = data.get("sales_by_payment_method") | |
| cat = data.get("sales_by_category") | |
| logger.debug("SAMPLE /api/analytics/sales -> sales_over_time[:2]=%s; sales_by_payment_method[:2]=%s; sales_by_category[:2]=%s", | |
| json.dumps((so or [])[:2]), json.dumps((pm or [])[:2]), json.dumps((cat or [])[:2])) | |
| except Exception: | |
| pass | |
| series = [] | |
| for r in _to_list(data.get("sales_over_time")): | |
| if not isinstance(r, dict): | |
| continue | |
| date_str = r.get("date") or r.get("day") or r.get("period") | |
| dt = _coerce_date(date_str) | |
| if dt is None: | |
| continue | |
| total_sales = _to_float(r.get("total_sales") or r.get("total") or r.get("revenue")) | |
| total_orders = _to_int(r.get("total_orders") or r.get("orders") or r.get("count")) | |
| aov = _to_float(r.get("average_order_value") or r.get("aov")) | |
| if aov is None and total_sales is not None and (total_orders or 0) > 0: | |
| aov = float(total_sales) / int(total_orders) | |
| series.append({ | |
| "_date": dt, | |
| "total_sales": float(total_sales) if total_sales is not None else 0.0, | |
| "total_orders": int(total_orders) if total_orders is not None else 0, | |
| "aov": float(aov) if aov is not None else None, | |
| }) | |
| df = pd.DataFrame(series) | |
| if df.empty: | |
| return pd.DataFrame(columns=["_date", "total_sales", "total_orders", "aov"]) | |
| df = df.sort_values("_date").reset_index(drop=True) | |
| emit_kpi_debug(self.tenant_key, "sales_series_raw", (raw if isinstance(raw, dict) else {"raw": raw})) | |
| logger.debug("SAMPLE sales_series_df.head -> %s", json.dumps(df.head(3).to_dict(orient="records"), default=str)) | |
| return df | |
| def transactions_df(self) -> pd.DataFrame: | |
| df = fetch_transactions_df(self.email, self.password, self.t_start, self.t_end) | |
| emit_kpi_debug(self.tenant_key, "transactions_df_meta", { | |
| "rows": int(len(df)), | |
| "cols": list(df.columns), | |
| "period": {"start": self.t_start.isoformat(), "end": self.t_end.isoformat()} | |
| }) | |
| # already logged columns + head in fetch_transactions_df() | |
| return df | |
| def _products(self) -> dict: | |
| raw = sc_request( | |
| "GET", "/api/analytics/products", self.email, self.password, | |
| params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")} | |
| ) | |
| data = self._unwrap_data(raw) | |
| emit_kpi_debug(self.tenant_key, "products", data or raw or {}) | |
| # log sample leaderboards if present | |
| keys = ["top_by_revenue","top_by_units","top_by_margin_value","top_by_margin_pct","bottom_by_revenue","loss_makers"] | |
| sample = {k: (data.get(k) or [])[:2] for k in keys if isinstance(data.get(k), list)} | |
| logger.debug("SAMPLE /api/analytics/products -> %s", json.dumps(sample)) | |
| return data or {} | |
| def _customers(self) -> dict: | |
| raw = sc_request( | |
| "GET", "/api/analytics/customers", self.email, self.password, | |
| params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")} | |
| ) | |
| data = self._unwrap_data(raw) | |
| emit_kpi_debug(self.tenant_key, "customers", data or raw or {}) | |
| # sample common shapes | |
| sample = { | |
| "top_customers_by_gp": (data.get("top_customers_by_gp") or [])[:2], | |
| "at_risk": (data.get("at_risk") or [])[:2], | |
| "new_customers": (data.get("new_customers") or [])[:2], | |
| "summary": data.get("summary"), | |
| } | |
| logger.debug("SAMPLE /api/analytics/customers -> %s", json.dumps(sample)) | |
| return data or {} | |
| def _inventory(self) -> dict: | |
| raw = sc_request("GET", "/api/analytics/inventory", self.email, self.password) | |
| data = self._unwrap_data(raw) | |
| emit_kpi_debug(self.tenant_key, "inventory", data or raw or {}) | |
| try: | |
| items = data.get("products") or data.get("items") or data.get("snapshot") or [] | |
| logger.debug("SAMPLE /api/analytics/inventory -> %s", json.dumps((items or [])[:2], default=str)) | |
| except Exception: | |
| pass | |
| return data or {} | |
| def _comparisons(self) -> dict: | |
| raw = sc_request( | |
| "GET", "/api/analytics/comparisons", self.email, self.password, | |
| params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")} | |
| ) | |
| data = self._unwrap_data(raw) | |
| emit_kpi_debug(self.tenant_key, "comparisons", data or raw or {}) | |
| try: | |
| logger.debug("SAMPLE /api/analytics/comparisons -> keys=%s", list(data.keys())[:15]) | |
| except Exception: | |
| pass | |
| return data or {} | |
| # -------------------- deterministic snapshot -------------------- | |
| def build_snapshot(self) -> Dict[str, Any]: | |
| dash = self._dashboard() | |
| sales_df = self._sales_series() | |
| prods = self._products() | |
| custs = self._customers() | |
| inv = self._inventory() | |
| comps = self._comparisons() | |
| def _get_num(d: dict, *keys, default=0.0): | |
| for k in keys: | |
| v = d.get(k) | |
| if isinstance(v, (int, float, str)): | |
| try: | |
| return float(v) | |
| except Exception: | |
| continue | |
| return default | |
| total_revenue = _get_num(dash, "total_revenue", "revenue", default=0.0) | |
| gross_profit = _get_num(dash, "gross_profit", "gp", default=0.0) | |
| transactions = int(_get_num(dash, "transactions", "orders", default=0.0)) | |
| if (total_revenue == 0.0 or transactions == 0) and isinstance(sales_df, pd.DataFrame) and not sales_df.empty: | |
| total_revenue = float(sales_df["total_sales"].sum()) | |
| transactions = int(sales_df["total_orders"].sum()) | |
| product_lb = { | |
| "top_by_revenue": prods.get("top_by_revenue") or prods.get("topRevenue") or [], | |
| "top_by_units": prods.get("top_by_units") or prods.get("topUnits") or [], | |
| "top_by_margin_value": prods.get("top_by_margin_value") or prods.get("topByGP") or [], | |
| "top_by_margin_pct": prods.get("top_by_margin_pct") or [], | |
| "bottom_by_revenue": prods.get("bottom_by_revenue") or prods.get("bottomRevenue") or [], | |
| "loss_makers": prods.get("loss_makers") or [], | |
| } | |
| customer_value = { | |
| "leaderboards": { | |
| "top_customers_by_gp": custs.get("top_customers_by_gp") or custs.get("topByGP") or [], | |
| "at_risk": custs.get("at_risk", []), | |
| "new_customers": custs.get("new_customers", []), | |
| }, | |
| "rfm_summary": custs.get("summary", {}), | |
| "params": {"window": self.period_label}, | |
| } | |
| temporal = self._temporal_patterns_from_sales(sales_df) | |
| inventory_block = { | |
| "status": "ok" if inv else "no_stock_data", | |
| "alerts": inv.get("alerts") if isinstance(inv, dict) else {}, | |
| "snapshot": inv, | |
| } | |
| snapshot = { | |
| "Summary Period": f"{self.period_label} ({self.t_start.date()} to {self.t_end.date()})", | |
| "Performance Snapshot": { | |
| "Total Revenue": round(total_revenue, 2), | |
| "Gross Profit": round(gross_profit, 2), | |
| "Transactions": transactions, | |
| "Change": { | |
| "revenue": dash.get("revenue_change") or dash.get("total_revenue_change"), | |
| "gross_profit": dash.get("gross_profit_change") or dash.get("gp_change"), | |
| "transactions": dash.get("transactions_change") or dash.get("orders_change"), | |
| }, | |
| }, | |
| "Temporal Patterns": temporal, | |
| "Product KPIs": {"leaderboards": product_lb}, | |
| "Customer Value": customer_value, | |
| "Inventory": inventory_block, | |
| "Comparisons": comps if isinstance(comps, dict) else {"data": comps}, | |
| "meta": { | |
| "timeframes": { | |
| "current_start": self.t_start.isoformat(), | |
| "current_end": self.t_end.isoformat(), | |
| "period_label": self.period_label, | |
| }, | |
| "row_counts": { | |
| "sales_points": int(len(sales_df)) if isinstance(sales_df, pd.DataFrame) else 0 | |
| }, | |
| }, | |
| } | |
| emit_kpi_debug(self.tenant_key, "snapshot_done", snapshot["meta"]) | |
| return json_safe(snapshot) | |
| def _temporal_patterns_from_sales(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| if df is None or df.empty: | |
| return {"series": [], "best_day_by_sales": None} | |
| d = df.copy() | |
| d["dow"] = d["_date"].dt.day_name() | |
| d["date"] = d["_date"].dt.strftime("%Y-%m-%d") | |
| g = d.groupby("dow", dropna=False).agg( | |
| total_sales=("total_sales", "sum"), | |
| total_orders=("total_orders", "sum"), | |
| ).reset_index() | |
| best_row = None if g.empty else g.loc[g["total_sales"].idxmax()] | |
| best_day = None if g.empty else { | |
| "day": str(best_row["dow"]), | |
| "total_sales": float(best_row["total_sales"]), | |
| "total_orders": int(best_row["total_orders"]), | |
| } | |
| series = d[["date", "total_sales", "total_orders", "aov"]].to_dict(orient="records") | |
| return {"series": series, "best_day_by_sales": best_day} | |
| def narrate(self, snapshot: dict, user_question: str) -> str: | |
| try: | |
| prompt = ( | |
| "You are a concise business analyst for Brave Retail Insights.\n" | |
| "RULES: Do NOT invent numbers; only use values in the JSON. Harare timezone. Keep it brief.\n" | |
| f"User Question: {json.dumps(user_question)}\n\n" | |
| f"Business Data JSON:\n{json.dumps(json_safe(snapshot), ensure_ascii=False)}\n" | |
| ) | |
| resp = llm.invoke(prompt) | |
| text = getattr(resp, "content", None) or str(resp) | |
| return sanitize_answer(text) | |
| except Exception: | |
| return "### Business Snapshot\n\n```\n" + json.dumps(json_safe(snapshot), indent=2) + "\n```" | |
| # ----------------------------------------------------------------------------- | |
| # /chat — PandasAI first on sales series, else deterministic snapshot + narration | |
| # ----------------------------------------------------------------------------- | |
| def chat(): | |
| rid = str(uuid.uuid4())[:8] | |
| logger.info(f"[{rid}] === /chat start ===") | |
| try: | |
| payload = request.get_json() or {} | |
| tenant_key = str(payload.get("tenant_key") or "admin") | |
| user_question = (payload.get("user_question") or "").strip() | |
| period = (payload.get("period") or "week").strip().lower() | |
| email = payload.get("email") | |
| password = payload.get("password") | |
| if not user_question: | |
| return jsonify({"answer": "Missing 'user_question'."}) | |
| if not email or not password: | |
| return jsonify({"error": "Missing 'email' or 'password'."}), 400 | |
| engine = AdminAnalyticsEngine(tenant_key, email, password, period) | |
| # Build transactions_df now and place it in meta logs (useful for PandasAI later) | |
| tdf = engine.transactions_df() | |
| # For simple Q&A we still start with sales_df (fast + stable) | |
| sales_df = engine._sales_series() | |
| if sales_df.empty and tdf.empty: | |
| snapshot = engine.build_snapshot() | |
| answer = engine.narrate(snapshot, user_question) | |
| return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "analyst_fallback"}}) | |
| try: | |
| logger.info(f"[{rid}] PandasAI attempt …") | |
| # If the question references products/items explicitly, switch to transactions_df | |
| use_df = tdf if re.search(r"\b(product|sku|item|category|top\s*5|top\s*ten|by\s*revenue|by\s*units)\b", user_question, re.I) and not tdf.empty else sales_df | |
| pandas_agent = SmartDataframe(use_df, config={ | |
| "llm": llm, | |
| "response_parser": FlaskResponse, | |
| "security": "none", | |
| "save_charts_path": user_defined_path, | |
| "save_charts": False, | |
| "enable_cache": False, | |
| "conversational": True, | |
| "enable_logging": False, | |
| }) | |
| combined_prompt = ( | |
| "Rules:\n" | |
| "1) Use pd.Timestamp.now(tz='Africa/Harare') for any now().\n" | |
| "2) Do NOT assume future dates; only use provided DataFrame columns.\n" | |
| "3) For monthly, derive via dt.to_period('M').\n" | |
| f"Question: {user_question}" | |
| ) | |
| answer = pandas_agent.chat(combined_prompt) | |
| if looks_like_error(answer): | |
| logger.warning(f"[{rid}] PandasAI invalid answer; fallback.") | |
| raise RuntimeError("PandasAI invalid answer") | |
| if isinstance(answer, pd.DataFrame): | |
| return jsonify({"answer": answer.to_html(), "meta": {"source": "pandasai"}}) | |
| if isinstance(answer, plt.Figure): | |
| buf = io.BytesIO() | |
| answer.savefig(buf, format="png") | |
| data_uri = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode('utf-8')}" | |
| return jsonify({"answer": data_uri, "meta": {"source": "pandasai"}}) | |
| return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "pandasai"}}) | |
| except Exception: | |
| snapshot = engine.build_snapshot() | |
| answer = engine.narrate(snapshot, user_question) | |
| return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "analyst_fallback"}}) | |
| except Exception as e: | |
| logger.exception(f"[{rid}] Critical unexpected error in /chat: {e}") | |
| return jsonify({"answer": "Something went wrong on our side. Please try again."}) | |
| # ----------------------------------------------------------------------------- | |
| # /report, /marketing, /notify — feed snapshot (admin holistic) | |
| # ----------------------------------------------------------------------------- | |
| def report(): | |
| logger.info("=== /report ===") | |
| try: | |
| payload = request.get_json() or {} | |
| tenant_key = str(payload.get("tenant_key") or "admin") | |
| period = (payload.get("period") or "week").strip().lower() | |
| email = payload.get("email"); password = payload.get("password") | |
| if not email or not password: | |
| return jsonify({"error": "Missing 'email' or 'password'."}), 400 | |
| engine = AdminAnalyticsEngine(tenant_key, email, password, period) | |
| snapshot = engine.build_snapshot() | |
| prompt = ( | |
| "You are a Brave Retail Insights business analyst. Analyze the following data and generate a " | |
| "succinct, insight-rich admin report with KPIs and recommendations. Use markdown only.\n" | |
| + json.dumps(json_safe(snapshot)) | |
| ) | |
| response = model.generate_content(prompt) | |
| return jsonify(str(response.text)) | |
| except Exception as e: | |
| logger.exception("Error in /report") | |
| return jsonify({"error": "Failed to generate report.", "details": str(e)}), 500 | |
| def marketing(): | |
| logger.info("=== /marketing ===") | |
| try: | |
| payload = request.get_json() or {} | |
| tenant_key = str(payload.get("tenant_key") or "admin") | |
| period = (payload.get("period") or "week").strip().lower() | |
| email = payload.get("email"); password = payload.get("password") | |
| if not email or not password: | |
| return jsonify({"error": "Missing 'email' or 'password'."}), 400 | |
| engine = AdminAnalyticsEngine(tenant_key, email, password, period) | |
| snapshot = engine.build_snapshot() | |
| prompt = ( | |
| "You are a Brave Retail Insights Marketing Specialist. Analyze the JSON and produce a concise, " | |
| "practical strategy (audiences, promos, timing). Only return the strategy.\n" | |
| + json.dumps(json_safe(snapshot)) | |
| ) | |
| response = model.generate_content(prompt) | |
| return jsonify(str(response.text)) | |
| except Exception as e: | |
| logger.exception("Error in /marketing") | |
| return jsonify({"error": "Failed to generate marketing strategy.", "details": str(e)}), 500 | |
| def notify(): | |
| logger.info("=== /notify ===") | |
| try: | |
| payload = request.get_json() or {} | |
| tenant_key = str(payload.get("tenant_key") or "admin") | |
| period = (payload.get("period") or "week").strip().lower() | |
| email = payload.get("email"); password = payload.get("password") | |
| if not email or not password: | |
| return jsonify({"error": "Missing 'email' or 'password'."}), 400 | |
| engine = AdminAnalyticsEngine(tenant_key, email, password, period) | |
| snapshot = engine.build_snapshot() | |
| prompt = ( | |
| "You are a Brave Retail Insights business analyst. Write up to 6 short bullets with actionable tips " | |
| "for an admin notification panel using this JSON.\n" | |
| + json.dumps(json_safe(snapshot)) | |
| ) | |
| response = model.generate_content(prompt) | |
| return jsonify(str(response.text)) | |
| except Exception as e: | |
| logger.exception("Error in /notify") | |
| return jsonify({"error": "Failed to generate notification content.", "details": str(e)}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # Voice briefing endpoints (history in Firebase; KPIs from admin snapshot) | |
| # ----------------------------------------------------------------------------- | |
| def _synthesize_history_summary(call_history: List[dict]) -> str: | |
| if not call_history: | |
| return "• New caller — no prior call history." | |
| history_json = json.dumps(json_safe(call_history), indent=2) | |
| analyst_prompt = ( | |
| "You are an executive assistant preparing a pre-call briefing for Brave Retail Insights. " | |
| "Only analyze the user's past call history and summarize recurring themes.\n\n" | |
| f"{history_json}\n\n- Output a few bullets only." | |
| ) | |
| try: | |
| response = model.generate_content(analyst_prompt) | |
| return (response.text or "").strip() or "• (empty)" | |
| except Exception: | |
| return "• Could not summarize prior calls." | |
| def log_call_usage(): | |
| payload = request.get_json() or {} | |
| profile_id = payload.get("profile_id") | |
| transcript = payload.get("transcript") | |
| duration = payload.get("durationSeconds") | |
| if not profile_id or not transcript: | |
| return jsonify({"error": "Missing 'profile_id' or 'transcript'."}), 400 | |
| try: | |
| call_id = f"call_{int(time.time())}" | |
| ref = db_ref.child(f"transcripts/{profile_id}/{call_id}") | |
| ref.set(json_safe({ | |
| "transcript": transcript, | |
| "profileId": profile_id, | |
| "durationSeconds": duration, | |
| "createdAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| })) | |
| return jsonify({"status": "success"}), 200 | |
| except Exception as e: | |
| logger.exception(f"Firebase error storing transcript for '{profile_id}': {e}") | |
| return jsonify({"error": "Server error while storing the transcript."}), 500 | |
| def get_call_briefing(): | |
| payload = request.get_json() or {} | |
| profile_id = str((payload.get("profile_id") or "").strip()) | |
| period = (payload.get("period") or "week").strip().lower() | |
| email = payload.get("email") | |
| password = payload.get("password") | |
| if not profile_id: | |
| return jsonify({"error": "Missing 'profile_id'."}), 400 | |
| if not email or not password: | |
| return jsonify({"error": "Missing 'email' or 'password'."}), 400 | |
| try: | |
| call_history = [] | |
| try: | |
| transcripts = db_ref.child(f"transcripts/{profile_id}").get() | |
| if transcripts: call_history = list(transcripts.values()) | |
| except Exception as e: | |
| logger.warning(f"Transcript fetch failed for '{profile_id}': {e}") | |
| memory_summary = _synthesize_history_summary(call_history) | |
| engine = AdminAnalyticsEngine(profile_id or "admin", email, password, period) | |
| kpi_snapshot = engine.build_snapshot() | |
| return jsonify({"memory_summary": memory_summary, "kpi_snapshot": json_safe(kpi_snapshot)}), 200 | |
| except Exception as e: | |
| logger.exception(f"Critical error in call-briefing for '{profile_id}': {e}") | |
| return jsonify({"error": "Failed to generate call briefing."}), 500 | |
| # ----------------------------------------------------------------------------- | |
| # Entrypoint | |
| # ----------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| # Do NOT use debug=True in production. | |
| app.run(debug=True, host="0.0.0.0", port=7860) |