Brave-chat-api / main.py
rairo's picture
Update main.py
90ac18d verified
# app.py — Brave Retail Insights (Admin-only holistic analytics; Harare-tz deterministic KPIs)
# - Base URL hardcoded to delta-api.pricelyst.co.zw
# - Admin credentials (email, password) supplied by CLIENT per request (cached per email)
# - Deterministic time windows (Harare); explicit start/end on API calls
# - KPI engine never uses LLM for numbers (LLM is narration-only fallback)
# - JSON-safe snapshot; deep DEBUG logs (optional mirror to Firebase)
# - Drop-in Firebase + AI wiring identical in spirit to prior server
from __future__ import annotations
import os, io, re, json, time, uuid, base64, logging
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
from flask import Flask, request, jsonify
from flask_cors import CORS, cross_origin
from dotenv import load_dotenv
# LLMs
from langchain_google_genai import ChatGoogleGenerativeAI
import google.generativeai as genai
# PandasAI (tier-1 attempt only)
from pandasai import SmartDataframe
from pandasai.responses.response_parser import ResponseParser
# Firebase
import firebase_admin
from firebase_admin import credentials, db
# -----------------------------------------------------------------------------
# Init
# -----------------------------------------------------------------------------
load_dotenv()
app = Flask(__name__)
CORS(app)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("brave-retail-app")
# -----------------------------------------------------------------------------
# Firebase Initialization (drop-in)
# -----------------------------------------------------------------------------
try:
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string:
raise ValueError("FIREBASE env var is not set")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
if not firebase_db_url:
raise ValueError("Firebase_DB env var is not set")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {"databaseURL": firebase_db_url})
db_ref = db.reference()
logger.info("Firebase Admin SDK initialized.")
except Exception as e:
logger.fatal(f"FATAL: Firebase init failed: {e}")
raise
LOG_KPI_TO_FIREBASE = os.getenv("LOG_KPI_TO_FIREBASE", "0") == "1"
# -----------------------------------------------------------------------------
# PandasAI ResponseParser (unchanged)
# -----------------------------------------------------------------------------
class FlaskResponse(ResponseParser):
def __init__(self, context):
super().__init__(context)
def format_dataframe(self, result):
try:
return result["value"].to_html()
except Exception:
return ""
def format_plot(self, result):
val = result.get("value")
if hasattr(val, "savefig"):
buf = io.BytesIO()
val.savefig(buf, format="png")
buf.seek(0)
return f"data:image/png;base64,{base64.b64encode(buf.read()).decode('utf-8')}"
if isinstance(val, str) and os.path.isfile(os.path.join(val)):
with open(os.path.join(val), "rb") as f:
return f"data:image/png;base64,{base64.b64encode(f.read()).decode('utf-8')}"
return str(val)
def format_other(self, result):
return str(result.get("value", ""))
# -----------------------------------------------------------------------------
# LLM init
# -----------------------------------------------------------------------------
logger.info("Initializing models…")
gemini_api_key = os.getenv("Gemini")
if not gemini_api_key:
raise ValueError("Gemini API key is required (env var Gemini).")
llm = ChatGoogleGenerativeAI(api_key=gemini_api_key, model="gemini-2.0-flash", temperature=0.1)
genai.configure(api_key=gemini_api_key)
generation_config = {"temperature": 0.2, "top_p": 0.95, "max_output_tokens": 5000}
model = genai.GenerativeModel(model_name="gemini-2.0-flash-lite-001", generation_config=generation_config)
logger.info("AI models initialized.")
user_defined_path = os.path.join("/exports/charts", str(uuid.uuid4()))
logger.info(f"Chart export path set to: {user_defined_path}")
# -----------------------------------------------------------------------------
# Admin API client (client-supplied credentials; holistic admin scope)
# -----------------------------------------------------------------------------
SC_BASE_URL = os.getenv("SC_BASE_URL", "https://delta-api.pricelyst.co.zw").rstrip("/")
class SCAuth:
"""Caches a requests.Session per admin email; supports bearer or cookie sessions."""
_cache: Dict[str, Dict[str, Any]] = {}
@classmethod
def invalidate(cls, email: str) -> None:
try:
entry = cls._cache.pop(email, None)
if entry and isinstance(entry.get("session"), requests.Session):
entry["session"].close()
except Exception:
pass
@classmethod
def _extract_token(cls, js: dict) -> Optional[str]:
if not isinstance(js, dict):
return None
candidates = [
js.get("token"),
js.get("access_token"),
(js.get("data") or {}).get("token"),
(js.get("data") or {}).get("access_token"),
(js.get("authorization") or {}).get("token"),
(js.get("auth") or {}).get("token"),
]
for t in candidates:
if isinstance(t, str) and t.strip():
return t.strip()
return None
@classmethod
def login(cls, email: str, password: str) -> Dict[str, Any]:
s = requests.Session()
s.headers.update({"Accept": "application/json"})
url = f"{SC_BASE_URL}/api/auth/admin/login"
resp = s.post(url, json={"email": email, "password": password}, timeout=30)
body_text, body_json = "", {}
try:
body_json = resp.json() or {}
except Exception:
body_text = (resp.text or "")[:800]
token = cls._extract_token(body_json)
if token:
s.headers.update({"Authorization": f"Bearer {token}"})
entry = {"session": s, "auth": "bearer", "token": token}
cls._cache[email] = entry
logger.debug("Admin login (bearer) OK")
return entry
if resp.cookies and (resp.status_code // 100) == 2:
entry = {"session": s, "auth": "cookie"}
cls._cache[email] = entry
logger.debug("Admin login (cookie) OK")
return entry
snippet = body_text or (str(body_json)[:800])
raise RuntimeError(f"Login did not return a token or cookie session. HTTP {resp.status_code}. Body≈ {snippet}")
def sc_request(method: str, path: str, email: str, password: str, *,
params: dict = None, json_body: dict = None, timeout: int = 30):
"""Authenticated request with 401 auto-refresh (once). Logs a compact sample on success."""
if not path.startswith("/"):
path = "/" + path
url = f"{SC_BASE_URL}{path}"
def _do(s: requests.Session):
return s.request(method.upper(), url, params=params, json=json_body, timeout=timeout)
entry = SCAuth._cache.get(email)
if not entry:
entry = SCAuth.login(email, password)
s = entry["session"]
resp = _do(s)
if resp.status_code == 401:
SCAuth.invalidate(email)
entry = SCAuth.login(email, password)
s = entry["session"]
resp = _do(s)
try:
resp.raise_for_status()
except Exception as e:
snippet = (getattr(resp, "text", "") or "")[:800]
raise RuntimeError(f"SC request error {method.upper()} {path}: HTTP {resp.status_code}{snippet}") from e
payload: Any
try:
payload = resp.json()
except Exception:
payload = resp.text
# ---- Compact sample logging for every endpoint ----
sample = None
if isinstance(payload, dict):
d = payload.get("data", payload)
if isinstance(d, dict):
# try common array keys
for key in ("sales_over_time", "orders", "transactions", "items", "list", "rows", "data"):
v = d.get(key)
if isinstance(v, list) and v:
sample = {key: v[:2]} # first 2 rows
break
if sample is None:
# fallback: first 10 keys
sample = {k: ("[list]" if isinstance(v, list) else v) for k, v in list(d.items())[:10]}
elif isinstance(d, list):
sample = d[:2]
elif isinstance(payload, list):
sample = payload[:2]
else:
sample = str(payload)[:300]
logger.debug("SAMPLE %s %s -> %s", method.upper(), path, json.dumps(sample, default=str))
return payload
# -----------------------------------------------------------------------------
# Timezone & temporal helpers
# -----------------------------------------------------------------------------
TZ = os.getenv("APP_TZ", "Africa/Harare")
_TZ = TZ # backward-compatible alias
def now_harare() -> pd.Timestamp:
return pd.Timestamp.now(tz=TZ)
def week_bounds_from(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
monday = ts.tz_convert(TZ).normalize() - pd.Timedelta(days=ts.weekday())
sunday = monday + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59)
return monday, sunday
def this_month_bounds(ts: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
first_this = ts.normalize().replace(day=1)
if first_this.month == 12:
first_next = first_this.replace(year=first_this.year + 1, month=1)
else:
first_next = first_this.replace(month=first_this.month + 1)
last_this = first_next - pd.Timedelta(seconds=1)
return first_this, last_this
def period_to_bounds(period: str) -> Tuple[pd.Timestamp, pd.Timestamp, str]:
p = (period or "week").strip().lower()
now = now_harare()
if p == "today":
start = now.normalize()
end = start + pd.Timedelta(hours=23, minutes=59, seconds=59); lbl = "Today"
elif p in ("week", "this_week"):
start, end = week_bounds_from(now); lbl = "This Week"
elif p in ("month", "this_month"):
start, end = this_month_bounds(now); lbl = "This Month"
elif p in ("year", "this_year"):
start = now.normalize().replace(month=1, day=1, hour=0, minute=0, second=0)
end = now.normalize().replace(month=12, day=31, hour=23, minute=59, second=59); lbl = "This Year"
else:
start, end = week_bounds_from(now); lbl = "This Week"
return start, end, lbl
def json_safe(obj: Any) -> Any:
if isinstance(obj, (np.integer,)): return int(obj)
if isinstance(obj, (np.floating,)): return float(obj)
if isinstance(obj, (np.bool_,)): return bool(obj)
if isinstance(obj, (pd.Timestamp,)): return obj.isoformat()
if isinstance(obj, (pd.Series,)): return obj.to_dict()
if isinstance(obj, (pd.Index,)): return [json_safe(x) for x in obj.tolist()]
if isinstance(obj, (dict,)): return {k: json_safe(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)): return [json_safe(x) for x in obj]
return obj
def emit_kpi_debug(profile_key: str, stage: str, payload: Dict[str, Any]) -> None:
try:
obj = {"profile": profile_key, "stage": stage, "payload": payload}
logger.debug("KPI_DEBUG %s", json.dumps(json_safe(obj)))
if LOG_KPI_TO_FIREBASE:
ts = int(time.time())
db_ref.child(f"kpi_debug/{profile_key}/{stage}_{ts}").set(json_safe(payload))
except Exception as e:
logger.warning(f"Failed to emit KPI debug logs: {e}")
# -----------------------------------------------------------------------------
# Error detection & sanitization
# -----------------------------------------------------------------------------
ERROR_PATTERNS = ["traceback","exception","keyerror","nameerror","syntaxerror","modulenotfounderror","importerror","execution failed","attributeerror","valueerror:"]
def _stringify(obj) -> str:
try:
if isinstance(obj, (pd.DataFrame, plt.Figure)): return ""
if isinstance(obj, (bytes, bytearray)): return obj.decode("utf-8", errors="ignore")
return str(obj)
except Exception:
return ""
def _extract_text_like(ans):
if isinstance(ans, dict):
if "value" in ans: return _stringify(ans["value"])
for k in ("message","text","content"):
if k in ans: return _stringify(ans[k])
return _stringify(ans)
if hasattr(ans, "value"):
try: return _stringify(getattr(ans, "value"))
except Exception: pass
return _stringify(ans)
def looks_like_error(ans) -> bool:
if isinstance(ans, (pd.DataFrame, plt.Figure)): return False
s = _extract_text_like(ans).strip().lower()
if not s: return True
if any(p in s for p in ERROR_PATTERNS): return True
if (" file " in s and " line " in s and "error" in s): return True
return False
def sanitize_answer(ans) -> str:
s = _extract_text_like(ans)
s = re.sub(r"```+\w*", "", s or "")
tb = "Traceback (most recent call last):"
if tb in s: s = s.split(tb, 1)[0]
return (s or "").strip()
# -----------------------------------------------------------------------------
# Robust normalizers
# -----------------------------------------------------------------------------
def _to_list(x: Any) -> List[Any]:
if x is None: return []
if isinstance(x, list): return x
if isinstance(x, dict): return [x]
if isinstance(x, str):
try:
j = json.loads(x)
if isinstance(j, list): return j
if isinstance(j, dict): return [j]
except Exception:
return [x]
return [x]
def _to_float(x: Any) -> Optional[float]:
try:
if x is None or (isinstance(x, str) and not x.strip()):
return None
return float(str(x).replace(",", "").strip())
except Exception:
return None
def _to_int(x: Any) -> Optional[int]:
try:
f = _to_float(x)
return int(f) if f is not None else None
except Exception:
return None
def _coerce_date(s: Any) -> Optional[pd.Timestamp]:
if s is None: return None
try:
dt = pd.to_datetime(s, errors="coerce")
if pd.isna(dt): return None
try:
return dt.tz_localize(TZ, nonexistent="shift_forward", ambiguous="NaT")
except Exception:
return dt.tz_convert(TZ)
except Exception:
return None
# -----------------------------------------------------------------------------
# Admin raw transactions extractor (row-level for PandasAI) + sample logging
# -----------------------------------------------------------------------------
def _paginate(sc_get, email, password, path, params=None, page_param="page", per_page=200, max_pages=50):
"""Generic paginator for endpoints with page/per_page/meta"""
params = dict(params or {})
params.setdefault(page_param, 1)
params.setdefault("per_page", per_page)
page = 1
for _ in range(max_pages):
params[page_param] = page
raw = sc_get("GET", path, email, password, params=params)
yield raw
try:
meta = (raw or {}).get("meta") or {}
last_page = int(meta.get("last_page") or 0)
cur = int(meta.get("current_page") or page)
if last_page and cur >= last_page:
break
if not last_page and not raw:
break
except Exception:
break
page += 1
def _normalize_line(order, item, tz=TZ) -> dict:
g = lambda o, *ks, default=None: next((o[k] for k in ks if isinstance(o, dict) and k in o), default)
to_f = lambda x: _to_float(x) or 0.0
to_i = lambda x: _to_int(x) or 0
order_id = g(order, "id", "order_id", "uuid", "reference")
created_at = g(order, "created_at", "date", "ordered_at", "timestamp")
customer = g(order, "customer_name", "customer", "buyer_name", "customer_reference")
payment = g(order, "payment_method", "payment", "money_type")
branch = g(order, "shop_name", "shop", "branch", "store")
status = g(order, "status")
currency = g(order, "currency")
prod_id = g(item, "product_id", "item_id", "sku_id", "id")
prod_name = g(item, "product_name", "name", "title", "sku")
qty = to_i(g(item, "quantity", "qty", "units"))
unit_price = to_f(g(item, "unit_price", "price", "unitPrice"))
line_total = to_f(g(item, "line_total", "total", "amount", "revenue"))
cost_price = _to_float(g(item, "unit_cost", "cost_price", "cost")) # optional
dt = _coerce_date(created_at)
revenue = line_total if line_total else (qty * unit_price)
gp = None
if cost_price is not None:
gp = float(revenue - qty * (cost_price or 0.0))
return {
"order_id": order_id,
"datetime": dt,
"date": dt.tz_convert(tz).date().isoformat() if dt is not None else None,
"customer": customer,
"payment_method": payment,
"branch": branch,
"status": status,
"currency": currency,
"product_id": prod_id,
"product": prod_name,
"quantity": qty,
"unit_price": unit_price,
"line_total": revenue,
"unit_cost": float(cost_price) if cost_price is not None else None,
"gross_profit": float(gp) if gp is not None else None,
}
def fetch_transactions_df(email: str, password: str, t_start: pd.Timestamp, t_end: pd.Timestamp) -> pd.DataFrame:
"""
Pull row-level order lines. Tries multiple likely endpoints, logs a sample for each,
flattens nested items, returns a clean DataFrame suitable for PandasAI.
"""
CANDIDATES: Tuple[Tuple[str, str, str], ...] = (
("/api/analytics/orders", "orders", "items"),
("/api/orders", "data", "items"), # many APIs wrap orders under "data"
("/api/analytics/transactions", "transactions", "items"),
("/api/sales/transactions", "transactions", "lines"),
)
params = {
"start_date": t_start.strftime("%Y-%m-%d"),
"end_date": t_end.strftime("%Y-%m-%d"),
"include": "items",
"per_page": 200,
}
rows: List[dict] = []
for path, orders_key, items_key in CANDIDATES:
try:
# Non-paginated attempt
raw = sc_request("GET", path, email, password, params=params)
# Log a sharper sample for this endpoint (top-level)
logger.debug("TXN_PROBE_RAW %s -> keys=%s", path, list(raw.keys())[:10] if isinstance(raw, dict) else type(raw))
payload = raw.get("data") if isinstance(raw, dict) and isinstance(raw.get("data"), (dict, list)) else raw
orders = payload.get(orders_key) if isinstance(payload, dict) else payload
if orders:
orders_list = _to_list(orders)
if orders_list:
# sample one order + items
o0 = orders_list[0] if isinstance(orders_list[0], dict) else {}
i0 = _to_list((o0 or {}).get(items_key))
logger.debug("TXN_SAMPLE %s -> order_keys=%s; first_item_keys=%s",
path,
list(o0.keys())[:15] if isinstance(o0, dict) else type(o0),
(list(i0[0].keys())[:15] if i0 and isinstance(i0[0], dict) else "N/A"))
for o in orders_list:
for it in _to_list((o or {}).get(items_key)):
if isinstance(o, dict) and isinstance(it, dict):
rows.append(_normalize_line(o, it))
if rows:
break
# Try paginated shape
collected = 0
for page_raw in _paginate(sc_request, email, password, path, params=params):
logger.debug("TXN_PAGE %s meta=%s", path, (page_raw or {}).get("meta") if isinstance(page_raw, dict) else "N/A")
page_data = page_raw.get("data") if isinstance(page_raw, dict) and isinstance(page_raw.get("data"), (dict, list)) else page_raw
page_orders = page_data.get(orders_key) if isinstance(page_data, dict) else page_data
for o in _to_list(page_orders):
for it in _to_list((o or {}).get(items_key)):
if isinstance(o, dict) and isinstance(it, dict):
rows.append(_normalize_line(o, it))
collected += 1
if collected and collected >= 5000: # safety cap
break
if rows:
# Log a compact sample of flattened rows
logger.debug("TXN_FLAT_SAMPLE %s -> %s", path, json.dumps(rows[:2], default=str))
break
except Exception as e:
logger.debug(f"fetch_transactions_df: {path} probe failed: {e}")
if not rows:
logger.warning("No row-level endpoint found; returning an empty transactions frame (schema only).")
schema = {
"datetime": pd.Series(dtype="datetime64[ns]"),
"date": pd.Series(dtype="object"),
"order_id": pd.Series(dtype="object"),
"status": pd.Series(dtype="object"),
"customer": pd.Series(dtype="object"),
"branch": pd.Series(dtype="object"),
"payment_method": pd.Series(dtype="object"),
"currency": pd.Series(dtype="object"),
"product_id": pd.Series(dtype="object"),
"product": pd.Series(dtype="object"),
"quantity": pd.Series(dtype="float"),
"unit_price": pd.Series(dtype="float"),
"line_total": pd.Series(dtype="float"),
"unit_cost": pd.Series(dtype="float"),
"gross_profit": pd.Series(dtype="float"),
}
return pd.DataFrame(schema)
df = pd.DataFrame(rows)
df["datetime"] = pd.to_datetime(df["datetime"], errors="coerce")
try:
# Keep tz-naive for some plotting libs but deterministic in Harare
df["datetime"] = df["datetime"].dt.tz_convert(TZ).dt.tz_localize(None)
except Exception:
pass
for c in ("quantity", "unit_price", "line_total", "unit_cost", "gross_profit"):
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce")
cols = [
"datetime", "date", "order_id", "status", "customer", "branch",
"payment_method", "currency", "product_id", "product",
"quantity", "unit_price", "line_total", "unit_cost", "gross_profit",
]
df = df[[c for c in cols if c in df.columns]]
logger.debug("TXN_DF_COLUMNS %s", df.columns.tolist())
logger.debug("TXN_DF_HEAD %s", json.dumps(df.head(3).to_dict(orient="records"), default=str))
return df
# -----------------------------------------------------------------------------
# Admin KPI Engine (holistic view) — logs sample after each endpoint
# -----------------------------------------------------------------------------
class AdminAnalyticsEngine:
"""Single-tenant holistic admin analytics. No shop/brand filters; admin sees entire dataset."""
def __init__(self, tenant_key: str, email: str, password: str, period: str = "week"):
self.tenant_key = (tenant_key or "admin").strip()
self.email = (email or "").strip()
self.password = (password or "").strip()
self.period = (period or "week").lower().strip()
self.t_start, self.t_end, self.period_label = period_to_bounds(self.period)
@staticmethod
def _unwrap_data(payload: dict) -> dict:
if isinstance(payload, dict):
return payload.get("data") if isinstance(payload.get("data"), dict) else payload
return {}
def _dashboard(self) -> dict:
raw = sc_request("GET", "/api/analytics/dashboard", self.email, self.password, params={"period": self.period})
data = self._unwrap_data(raw)
emit_kpi_debug(self.tenant_key, "dashboard", data or raw or {})
# Log a friendly sample view:
logger.debug("SAMPLE /api/analytics/dashboard -> %s", json.dumps({k: data.get(k) for k in list(data.keys())[:10]}, default=str))
return data or {}
def _sales_series(self) -> pd.DataFrame:
params = {
"start_date": self.t_start.strftime("%Y-%m-%d"),
"end_date": self.t_end.strftime("%Y-%m-%d"),
"group_by": "day",
}
raw = sc_request("GET", "/api/analytics/sales", self.email, self.password, params=params)
data = {}
if isinstance(raw, dict):
data = (raw.get("data") or raw) if isinstance(raw.get("data"), (dict, list)) else raw
else:
try:
j = json.loads(raw)
data = j.get("data", j) if isinstance(j, dict) else {}
except Exception:
data = {}
# log samples from top-level keys we expect
try:
so = data.get("sales_over_time")
pm = data.get("sales_by_payment_method")
cat = data.get("sales_by_category")
logger.debug("SAMPLE /api/analytics/sales -> sales_over_time[:2]=%s; sales_by_payment_method[:2]=%s; sales_by_category[:2]=%s",
json.dumps((so or [])[:2]), json.dumps((pm or [])[:2]), json.dumps((cat or [])[:2]))
except Exception:
pass
series = []
for r in _to_list(data.get("sales_over_time")):
if not isinstance(r, dict):
continue
date_str = r.get("date") or r.get("day") or r.get("period")
dt = _coerce_date(date_str)
if dt is None:
continue
total_sales = _to_float(r.get("total_sales") or r.get("total") or r.get("revenue"))
total_orders = _to_int(r.get("total_orders") or r.get("orders") or r.get("count"))
aov = _to_float(r.get("average_order_value") or r.get("aov"))
if aov is None and total_sales is not None and (total_orders or 0) > 0:
aov = float(total_sales) / int(total_orders)
series.append({
"_date": dt,
"total_sales": float(total_sales) if total_sales is not None else 0.0,
"total_orders": int(total_orders) if total_orders is not None else 0,
"aov": float(aov) if aov is not None else None,
})
df = pd.DataFrame(series)
if df.empty:
return pd.DataFrame(columns=["_date", "total_sales", "total_orders", "aov"])
df = df.sort_values("_date").reset_index(drop=True)
emit_kpi_debug(self.tenant_key, "sales_series_raw", (raw if isinstance(raw, dict) else {"raw": raw}))
logger.debug("SAMPLE sales_series_df.head -> %s", json.dumps(df.head(3).to_dict(orient="records"), default=str))
return df
def transactions_df(self) -> pd.DataFrame:
df = fetch_transactions_df(self.email, self.password, self.t_start, self.t_end)
emit_kpi_debug(self.tenant_key, "transactions_df_meta", {
"rows": int(len(df)),
"cols": list(df.columns),
"period": {"start": self.t_start.isoformat(), "end": self.t_end.isoformat()}
})
# already logged columns + head in fetch_transactions_df()
return df
def _products(self) -> dict:
raw = sc_request(
"GET", "/api/analytics/products", self.email, self.password,
params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")}
)
data = self._unwrap_data(raw)
emit_kpi_debug(self.tenant_key, "products", data or raw or {})
# log sample leaderboards if present
keys = ["top_by_revenue","top_by_units","top_by_margin_value","top_by_margin_pct","bottom_by_revenue","loss_makers"]
sample = {k: (data.get(k) or [])[:2] for k in keys if isinstance(data.get(k), list)}
logger.debug("SAMPLE /api/analytics/products -> %s", json.dumps(sample))
return data or {}
def _customers(self) -> dict:
raw = sc_request(
"GET", "/api/analytics/customers", self.email, self.password,
params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")}
)
data = self._unwrap_data(raw)
emit_kpi_debug(self.tenant_key, "customers", data or raw or {})
# sample common shapes
sample = {
"top_customers_by_gp": (data.get("top_customers_by_gp") or [])[:2],
"at_risk": (data.get("at_risk") or [])[:2],
"new_customers": (data.get("new_customers") or [])[:2],
"summary": data.get("summary"),
}
logger.debug("SAMPLE /api/analytics/customers -> %s", json.dumps(sample))
return data or {}
def _inventory(self) -> dict:
raw = sc_request("GET", "/api/analytics/inventory", self.email, self.password)
data = self._unwrap_data(raw)
emit_kpi_debug(self.tenant_key, "inventory", data or raw or {})
try:
items = data.get("products") or data.get("items") or data.get("snapshot") or []
logger.debug("SAMPLE /api/analytics/inventory -> %s", json.dumps((items or [])[:2], default=str))
except Exception:
pass
return data or {}
def _comparisons(self) -> dict:
raw = sc_request(
"GET", "/api/analytics/comparisons", self.email, self.password,
params={"start_date": self.t_start.strftime("%Y-%m-%d"), "end_date": self.t_end.strftime("%Y-%m-%d")}
)
data = self._unwrap_data(raw)
emit_kpi_debug(self.tenant_key, "comparisons", data or raw or {})
try:
logger.debug("SAMPLE /api/analytics/comparisons -> keys=%s", list(data.keys())[:15])
except Exception:
pass
return data or {}
# -------------------- deterministic snapshot --------------------
def build_snapshot(self) -> Dict[str, Any]:
dash = self._dashboard()
sales_df = self._sales_series()
prods = self._products()
custs = self._customers()
inv = self._inventory()
comps = self._comparisons()
def _get_num(d: dict, *keys, default=0.0):
for k in keys:
v = d.get(k)
if isinstance(v, (int, float, str)):
try:
return float(v)
except Exception:
continue
return default
total_revenue = _get_num(dash, "total_revenue", "revenue", default=0.0)
gross_profit = _get_num(dash, "gross_profit", "gp", default=0.0)
transactions = int(_get_num(dash, "transactions", "orders", default=0.0))
if (total_revenue == 0.0 or transactions == 0) and isinstance(sales_df, pd.DataFrame) and not sales_df.empty:
total_revenue = float(sales_df["total_sales"].sum())
transactions = int(sales_df["total_orders"].sum())
product_lb = {
"top_by_revenue": prods.get("top_by_revenue") or prods.get("topRevenue") or [],
"top_by_units": prods.get("top_by_units") or prods.get("topUnits") or [],
"top_by_margin_value": prods.get("top_by_margin_value") or prods.get("topByGP") or [],
"top_by_margin_pct": prods.get("top_by_margin_pct") or [],
"bottom_by_revenue": prods.get("bottom_by_revenue") or prods.get("bottomRevenue") or [],
"loss_makers": prods.get("loss_makers") or [],
}
customer_value = {
"leaderboards": {
"top_customers_by_gp": custs.get("top_customers_by_gp") or custs.get("topByGP") or [],
"at_risk": custs.get("at_risk", []),
"new_customers": custs.get("new_customers", []),
},
"rfm_summary": custs.get("summary", {}),
"params": {"window": self.period_label},
}
temporal = self._temporal_patterns_from_sales(sales_df)
inventory_block = {
"status": "ok" if inv else "no_stock_data",
"alerts": inv.get("alerts") if isinstance(inv, dict) else {},
"snapshot": inv,
}
snapshot = {
"Summary Period": f"{self.period_label} ({self.t_start.date()} to {self.t_end.date()})",
"Performance Snapshot": {
"Total Revenue": round(total_revenue, 2),
"Gross Profit": round(gross_profit, 2),
"Transactions": transactions,
"Change": {
"revenue": dash.get("revenue_change") or dash.get("total_revenue_change"),
"gross_profit": dash.get("gross_profit_change") or dash.get("gp_change"),
"transactions": dash.get("transactions_change") or dash.get("orders_change"),
},
},
"Temporal Patterns": temporal,
"Product KPIs": {"leaderboards": product_lb},
"Customer Value": customer_value,
"Inventory": inventory_block,
"Comparisons": comps if isinstance(comps, dict) else {"data": comps},
"meta": {
"timeframes": {
"current_start": self.t_start.isoformat(),
"current_end": self.t_end.isoformat(),
"period_label": self.period_label,
},
"row_counts": {
"sales_points": int(len(sales_df)) if isinstance(sales_df, pd.DataFrame) else 0
},
},
}
emit_kpi_debug(self.tenant_key, "snapshot_done", snapshot["meta"])
return json_safe(snapshot)
def _temporal_patterns_from_sales(self, df: pd.DataFrame) -> Dict[str, Any]:
if df is None or df.empty:
return {"series": [], "best_day_by_sales": None}
d = df.copy()
d["dow"] = d["_date"].dt.day_name()
d["date"] = d["_date"].dt.strftime("%Y-%m-%d")
g = d.groupby("dow", dropna=False).agg(
total_sales=("total_sales", "sum"),
total_orders=("total_orders", "sum"),
).reset_index()
best_row = None if g.empty else g.loc[g["total_sales"].idxmax()]
best_day = None if g.empty else {
"day": str(best_row["dow"]),
"total_sales": float(best_row["total_sales"]),
"total_orders": int(best_row["total_orders"]),
}
series = d[["date", "total_sales", "total_orders", "aov"]].to_dict(orient="records")
return {"series": series, "best_day_by_sales": best_day}
def narrate(self, snapshot: dict, user_question: str) -> str:
try:
prompt = (
"You are a concise business analyst for Brave Retail Insights.\n"
"RULES: Do NOT invent numbers; only use values in the JSON. Harare timezone. Keep it brief.\n"
f"User Question: {json.dumps(user_question)}\n\n"
f"Business Data JSON:\n{json.dumps(json_safe(snapshot), ensure_ascii=False)}\n"
)
resp = llm.invoke(prompt)
text = getattr(resp, "content", None) or str(resp)
return sanitize_answer(text)
except Exception:
return "### Business Snapshot\n\n```\n" + json.dumps(json_safe(snapshot), indent=2) + "\n```"
# -----------------------------------------------------------------------------
# /chat — PandasAI first on sales series, else deterministic snapshot + narration
# -----------------------------------------------------------------------------
@app.route("/chat", methods=["POST"])
@cross_origin()
def chat():
rid = str(uuid.uuid4())[:8]
logger.info(f"[{rid}] === /chat start ===")
try:
payload = request.get_json() or {}
tenant_key = str(payload.get("tenant_key") or "admin")
user_question = (payload.get("user_question") or "").strip()
period = (payload.get("period") or "week").strip().lower()
email = payload.get("email")
password = payload.get("password")
if not user_question:
return jsonify({"answer": "Missing 'user_question'."})
if not email or not password:
return jsonify({"error": "Missing 'email' or 'password'."}), 400
engine = AdminAnalyticsEngine(tenant_key, email, password, period)
# Build transactions_df now and place it in meta logs (useful for PandasAI later)
tdf = engine.transactions_df()
# For simple Q&A we still start with sales_df (fast + stable)
sales_df = engine._sales_series()
if sales_df.empty and tdf.empty:
snapshot = engine.build_snapshot()
answer = engine.narrate(snapshot, user_question)
return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "analyst_fallback"}})
try:
logger.info(f"[{rid}] PandasAI attempt …")
# If the question references products/items explicitly, switch to transactions_df
use_df = tdf if re.search(r"\b(product|sku|item|category|top\s*5|top\s*ten|by\s*revenue|by\s*units)\b", user_question, re.I) and not tdf.empty else sales_df
pandas_agent = SmartDataframe(use_df, config={
"llm": llm,
"response_parser": FlaskResponse,
"security": "none",
"save_charts_path": user_defined_path,
"save_charts": False,
"enable_cache": False,
"conversational": True,
"enable_logging": False,
})
combined_prompt = (
"Rules:\n"
"1) Use pd.Timestamp.now(tz='Africa/Harare') for any now().\n"
"2) Do NOT assume future dates; only use provided DataFrame columns.\n"
"3) For monthly, derive via dt.to_period('M').\n"
f"Question: {user_question}"
)
answer = pandas_agent.chat(combined_prompt)
if looks_like_error(answer):
logger.warning(f"[{rid}] PandasAI invalid answer; fallback.")
raise RuntimeError("PandasAI invalid answer")
if isinstance(answer, pd.DataFrame):
return jsonify({"answer": answer.to_html(), "meta": {"source": "pandasai"}})
if isinstance(answer, plt.Figure):
buf = io.BytesIO()
answer.savefig(buf, format="png")
data_uri = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode('utf-8')}"
return jsonify({"answer": data_uri, "meta": {"source": "pandasai"}})
return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "pandasai"}})
except Exception:
snapshot = engine.build_snapshot()
answer = engine.narrate(snapshot, user_question)
return jsonify({"answer": sanitize_answer(answer), "meta": {"source": "analyst_fallback"}})
except Exception as e:
logger.exception(f"[{rid}] Critical unexpected error in /chat: {e}")
return jsonify({"answer": "Something went wrong on our side. Please try again."})
# -----------------------------------------------------------------------------
# /report, /marketing, /notify — feed snapshot (admin holistic)
# -----------------------------------------------------------------------------
@app.route("/report", methods=["POST"])
@cross_origin()
def report():
logger.info("=== /report ===")
try:
payload = request.get_json() or {}
tenant_key = str(payload.get("tenant_key") or "admin")
period = (payload.get("period") or "week").strip().lower()
email = payload.get("email"); password = payload.get("password")
if not email or not password:
return jsonify({"error": "Missing 'email' or 'password'."}), 400
engine = AdminAnalyticsEngine(tenant_key, email, password, period)
snapshot = engine.build_snapshot()
prompt = (
"You are a Brave Retail Insights business analyst. Analyze the following data and generate a "
"succinct, insight-rich admin report with KPIs and recommendations. Use markdown only.\n"
+ json.dumps(json_safe(snapshot))
)
response = model.generate_content(prompt)
return jsonify(str(response.text))
except Exception as e:
logger.exception("Error in /report")
return jsonify({"error": "Failed to generate report.", "details": str(e)}), 500
@app.route("/marketing", methods=["POST"])
@cross_origin()
def marketing():
logger.info("=== /marketing ===")
try:
payload = request.get_json() or {}
tenant_key = str(payload.get("tenant_key") or "admin")
period = (payload.get("period") or "week").strip().lower()
email = payload.get("email"); password = payload.get("password")
if not email or not password:
return jsonify({"error": "Missing 'email' or 'password'."}), 400
engine = AdminAnalyticsEngine(tenant_key, email, password, period)
snapshot = engine.build_snapshot()
prompt = (
"You are a Brave Retail Insights Marketing Specialist. Analyze the JSON and produce a concise, "
"practical strategy (audiences, promos, timing). Only return the strategy.\n"
+ json.dumps(json_safe(snapshot))
)
response = model.generate_content(prompt)
return jsonify(str(response.text))
except Exception as e:
logger.exception("Error in /marketing")
return jsonify({"error": "Failed to generate marketing strategy.", "details": str(e)}), 500
@app.route("/notify", methods=["POST"])
@cross_origin()
def notify():
logger.info("=== /notify ===")
try:
payload = request.get_json() or {}
tenant_key = str(payload.get("tenant_key") or "admin")
period = (payload.get("period") or "week").strip().lower()
email = payload.get("email"); password = payload.get("password")
if not email or not password:
return jsonify({"error": "Missing 'email' or 'password'."}), 400
engine = AdminAnalyticsEngine(tenant_key, email, password, period)
snapshot = engine.build_snapshot()
prompt = (
"You are a Brave Retail Insights business analyst. Write up to 6 short bullets with actionable tips "
"for an admin notification panel using this JSON.\n"
+ json.dumps(json_safe(snapshot))
)
response = model.generate_content(prompt)
return jsonify(str(response.text))
except Exception as e:
logger.exception("Error in /notify")
return jsonify({"error": "Failed to generate notification content.", "details": str(e)}), 500
# -----------------------------------------------------------------------------
# Voice briefing endpoints (history in Firebase; KPIs from admin snapshot)
# -----------------------------------------------------------------------------
def _synthesize_history_summary(call_history: List[dict]) -> str:
if not call_history:
return "• New caller — no prior call history."
history_json = json.dumps(json_safe(call_history), indent=2)
analyst_prompt = (
"You are an executive assistant preparing a pre-call briefing for Brave Retail Insights. "
"Only analyze the user's past call history and summarize recurring themes.\n\n"
f"{history_json}\n\n- Output a few bullets only."
)
try:
response = model.generate_content(analyst_prompt)
return (response.text or "").strip() or "• (empty)"
except Exception:
return "• Could not summarize prior calls."
@app.route("/api/log-call-usage", methods=["POST"])
@cross_origin()
def log_call_usage():
payload = request.get_json() or {}
profile_id = payload.get("profile_id")
transcript = payload.get("transcript")
duration = payload.get("durationSeconds")
if not profile_id or not transcript:
return jsonify({"error": "Missing 'profile_id' or 'transcript'."}), 400
try:
call_id = f"call_{int(time.time())}"
ref = db_ref.child(f"transcripts/{profile_id}/{call_id}")
ref.set(json_safe({
"transcript": transcript,
"profileId": profile_id,
"durationSeconds": duration,
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}))
return jsonify({"status": "success"}), 200
except Exception as e:
logger.exception(f"Firebase error storing transcript for '{profile_id}': {e}")
return jsonify({"error": "Server error while storing the transcript."}), 500
@app.route("/api/call-briefing", methods=["POST"])
@cross_origin()
def get_call_briefing():
payload = request.get_json() or {}
profile_id = str((payload.get("profile_id") or "").strip())
period = (payload.get("period") or "week").strip().lower()
email = payload.get("email")
password = payload.get("password")
if not profile_id:
return jsonify({"error": "Missing 'profile_id'."}), 400
if not email or not password:
return jsonify({"error": "Missing 'email' or 'password'."}), 400
try:
call_history = []
try:
transcripts = db_ref.child(f"transcripts/{profile_id}").get()
if transcripts: call_history = list(transcripts.values())
except Exception as e:
logger.warning(f"Transcript fetch failed for '{profile_id}': {e}")
memory_summary = _synthesize_history_summary(call_history)
engine = AdminAnalyticsEngine(profile_id or "admin", email, password, period)
kpi_snapshot = engine.build_snapshot()
return jsonify({"memory_summary": memory_summary, "kpi_snapshot": json_safe(kpi_snapshot)}), 200
except Exception as e:
logger.exception(f"Critical error in call-briefing for '{profile_id}': {e}")
return jsonify({"error": "Failed to generate call briefing."}), 500
# -----------------------------------------------------------------------------
# Entrypoint
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# Do NOT use debug=True in production.
app.run(debug=True, host="0.0.0.0", port=7860)