"""BiteSight Analytics API. Static-CSV-backed analytics service that exposes: - Deterministic KPI endpoints under /api/* (used by the dashboard). - An AI BizConsultant /chat endpoint backed by a pandas LangChain agent. The CSV at DATA_FILE is loaded once at startup, normalized (datetime parsing, day-of-week, hour) and reused for every request. All KPI endpoints accept the same filter contract: branch / customer_type / start / end. """ from __future__ import annotations import logging import os import traceback from typing import Any, Dict, List, Optional import numpy as np import pandas as pd from fastapi import FastAPI, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger("bitesight") # --------------------------------------------------------------------------- # App + CORS # --------------------------------------------------------------------------- app = FastAPI(title="BiteSight Analytics API", version="2.0.0") # CORS: allow * for now (static dashboard hosted on Cloudflare Pages can have many # preview URLs). For tighter security, replace with an explicit whitelist via env. ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "*") app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in ALLOWED_ORIGINS.split(",")] if ALLOWED_ORIGINS != "*" else ["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # Data layer # --------------------------------------------------------------------------- DATA_FILE = os.environ.get("DATA_FILE", "data_client.csv") DOW_ORDER = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] DOW_SHORT = { "Monday": "Mon", "Tuesday": "Tue", "Wednesday": "Wed", "Thursday": "Thu", "Friday": "Fri", "Saturday": "Sat", "Sunday": "Sun", } def _load_df() -> pd.DataFrame: raw = pd.read_csv(DATA_FILE) raw["Date"] = pd.to_datetime(raw["Date"], format="%m/%d/%Y", errors="coerce") # Time is like "1:08:00 PM" raw["Time_parsed"] = pd.to_datetime(raw["Time"], format="%I:%M:%S %p", errors="coerce") raw["Hour"] = raw["Time_parsed"].dt.hour raw["DayOfWeek"] = raw["Date"].dt.day_name() raw["MonthLabel"] = raw["Date"].dt.strftime("%b %Y") return raw try: df = _load_df() log.info("Loaded %s with %d rows (range %s -> %s)", DATA_FILE, len(df), df["Date"].min(), df["Date"].max()) except Exception as exc: # pragma: no cover - startup failure log.exception("Could not load %s: %s", DATA_FILE, exc) df = pd.DataFrame() def _parse_date(value: Optional[str]) -> Optional[pd.Timestamp]: if not value: return None try: return pd.to_datetime(value) except Exception: return None def filter_df( branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, ) -> pd.DataFrame: """Apply the standard filter contract used by every KPI endpoint.""" if df.empty: return df f = df if branch and branch.lower() != "all": f = f[f["Branch"].str.lower() == branch.lower()] if customer_type and customer_type.lower() != "all": f = f[f["Customer type"].str.lower() == customer_type.lower()] s = _parse_date(start) e = _parse_date(end) if s is not None: f = f[f["Date"] >= s] if e is not None: # inclusive end-of-day f = f[f["Date"] <= e + pd.Timedelta(days=1) - pd.Timedelta(seconds=1)] return f def _safe_pct(curr: float, prev: float) -> Optional[float]: if prev is None or prev == 0 or np.isnan(prev): return None return float(((curr - prev) / prev) * 100.0) def _split_halves(f: pd.DataFrame, agg) -> tuple[float, Optional[float]]: """Compute a metric for the second half and first half of the period for trend deltas.""" if f.empty: return 0.0, None if f["Date"].nunique() < 2: return float(agg(f)), None midpoint = f["Date"].min() + (f["Date"].max() - f["Date"].min()) / 2 first = f[f["Date"] < midpoint] second = f[f["Date"] >= midpoint] return float(agg(second)), (float(agg(first)) if not first.empty else None) def _round(v: Any, n: int = 2) -> float: try: if v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v))): return 0.0 return round(float(v), n) except Exception: return 0.0 # --------------------------------------------------------------------------- # Health & metadata # --------------------------------------------------------------------------- @app.get("/") def root(): return {"service": "BiteSight Analytics API", "version": "2.0.0", "rows": len(df)} @app.get("/health") def health(): return { "status": "ok" if not df.empty else "degraded", "rows": int(len(df)), "columns": list(df.columns) if not df.empty else [], } @app.get("/api/meta") def get_meta(): if df.empty: return {"error": "data unavailable"} branches = sorted(df["Branch"].dropna().unique().tolist()) cities_map = ( df.dropna(subset=["Branch", "City"]).drop_duplicates("Branch").set_index("Branch")["City"].to_dict() ) return { "branches": branches, "cities": cities_map, "products": sorted(df["Product line"].dropna().unique().tolist()), "payments": sorted(df["Payment"].dropna().unique().tolist()), "customer_types": sorted(df["Customer type"].dropna().unique().tolist()), "genders": sorted(df["Gender"].dropna().unique().tolist()), "date_range": { "min": df["Date"].min().strftime("%Y-%m-%d"), "max": df["Date"].max().strftime("%Y-%m-%d"), }, "total_rows": int(len(df)), } # --------------------------------------------------------------------------- # KPI endpoints # --------------------------------------------------------------------------- def _common_filters( branch: Optional[str] = Query(None), customer_type: Optional[str] = Query(None), start: Optional[str] = Query(None), end: Optional[str] = Query(None), ): return {"branch": branch, "customer_type": customer_type, "start": start, "end": end} @app.get("/api/kpi/summary") def kpi_summary( branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, ): f = filter_df(branch, customer_type, start, end) if f.empty: return {"empty": True} rev_curr, rev_prev = _split_halves(f, lambda x: x["Sales"].sum()) txn_curr, txn_prev = _split_halves(f, lambda x: len(x)) basket_curr, basket_prev = _split_halves( f, lambda x: x["Sales"].mean() if len(x) else 0 ) rating_curr, rating_prev = _split_halves( f, lambda x: x["Rating"].mean() if len(x) else 0 ) margin_curr, margin_prev = _split_halves( f, lambda x: x["gross margin percentage"].mean() if len(x) else 0 ) daily = ( f.groupby(f["Date"].dt.date)["Sales"].sum().reset_index().sort_values("Date") ) sparkline = [ {"date": pd.Timestamp(r["Date"]).strftime("%Y-%m-%d"), "value": _round(r["Sales"])} for _, r in daily.iterrows() ] return { "period": { "start": f["Date"].min().strftime("%Y-%m-%d"), "end": f["Date"].max().strftime("%Y-%m-%d"), "days": int((f["Date"].max() - f["Date"].min()).days + 1), }, "revenue": {"value": _round(f["Sales"].sum()), "delta_pct": _round(_safe_pct(rev_curr, rev_prev) or 0)}, "transactions": {"value": int(len(f)), "delta_pct": _round(_safe_pct(txn_curr, txn_prev) or 0)}, "avg_basket": {"value": _round(f["Sales"].mean()), "delta_pct": _round(_safe_pct(basket_curr, basket_prev) or 0)}, "rating": {"value": _round(f["Rating"].mean()), "delta_pct": _round(_safe_pct(rating_curr, rating_prev) or 0)}, "margin": { "value": _round(f["gross margin percentage"].mean()), "delta_pct": _round(_safe_pct(margin_curr, margin_prev) or 0), "target": 5.0, }, "sparkline": sparkline, } @app.get("/api/kpi/sales-by-day") def sales_by_day(branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, customer_type, start, end) if f.empty: return [{"name": DOW_SHORT[d], "member": 0, "normal": 0} for d in DOW_ORDER] pivot = ( f.groupby(["DayOfWeek", "Customer type"])["Sales"].sum().unstack(fill_value=0) .reindex(DOW_ORDER, fill_value=0) ) out = [] for d in DOW_ORDER: out.append({ "name": DOW_SHORT[d], "member": _round(pivot.loc[d].get("Member", 0)) if d in pivot.index else 0, "normal": _round(pivot.loc[d].get("Normal", 0)) if d in pivot.index else 0, }) return out @app.get("/api/kpi/heatmap") def heatmap(branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, customer_type, start, end) pivot = ( f.groupby(["Product line", "DayOfWeek"])["Invoice ID"].count().unstack(fill_value=0) if not f.empty else pd.DataFrame() ) if not pivot.empty: pivot = pivot.reindex(columns=DOW_ORDER, fill_value=0) rows = [] for product in sorted(pivot.index) if not pivot.empty else sorted(df["Product line"].unique() if not df.empty else []): rows.append({ "name": product, "data": [int(pivot.loc[product, d]) if (not pivot.empty and product in pivot.index) else 0 for d in DOW_ORDER], }) max_val = int(pivot.values.max()) if not pivot.empty and pivot.size > 0 else 1 return {"rows": rows, "days": [DOW_SHORT[d] for d in DOW_ORDER], "max": max_val} @app.get("/api/kpi/peak-hours") def peak_hours(branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, customer_type, start, end) if f.empty: return [] by_hour = ( f.dropna(subset=["Hour"]).groupby("Hour")["Invoice ID"].count().reset_index().sort_values("Hour") ) def _fmt(h: int) -> str: h = int(h) if h == 0: return "12am" if h < 12: return f"{h}am" if h == 12: return "12pm" return f"{h - 12}pm" return [ {"name": _fmt(int(r["Hour"])), "hour": int(r["Hour"]), "visitors": int(r["Invoice ID"])} for _, r in by_hour.iterrows() ] @app.get("/api/kpi/payments") def payments(branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, customer_type, start, end) if f.empty: return [{"name": DOW_SHORT[d], "Ewallet": 0, "Cash": 0, "Card": 0} for d in DOW_ORDER] pivot = ( f.groupby(["DayOfWeek", "Payment"])["Invoice ID"].count().unstack(fill_value=0) .reindex(DOW_ORDER, fill_value=0) ) out = [] for d in DOW_ORDER: row = pivot.loc[d] if d in pivot.index else pd.Series() out.append({ "name": DOW_SHORT[d], "Ewallet": int(row.get("Ewallet", 0)) if not row.empty else 0, "Cash": int(row.get("Cash", 0)) if not row.empty else 0, "Card": int(row.get("Credit card", 0)) if not row.empty else 0, }) return out @app.get("/api/kpi/branch-radar") def branch_radar(customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(None, customer_type, start, end) if f.empty: return {"data": [], "branches": []} branches = sorted(f["Branch"].unique().tolist()) metrics: Dict[str, Dict[str, float]] = {} for b in branches: sub = f[f["Branch"] == b] metrics[b] = { "Revenue": float(sub["Sales"].sum()), "Transactions": float(len(sub)), "Quantity": float(sub["Quantity"].sum()), "AvgTicket": float(sub["Sales"].mean() if len(sub) else 0), "Rating": float(sub["Rating"].mean() if len(sub) else 0), } dimensions = ["Revenue", "Transactions", "Quantity", "AvgTicket", "Rating"] data: List[Dict[str, Any]] = [] for dim in dimensions: max_val = max((metrics[b][dim] for b in branches), default=1) or 1 entry: Dict[str, Any] = {"subject": dim, "fullMark": 100} for b in branches: entry[b] = _round((metrics[b][dim] / max_val) * 100, 1) data.append(entry) return {"data": data, "branches": branches, "raw": metrics} @app.get("/api/kpi/bubble-strategy") def bubble_strategy(branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, customer_type, start, end) if f.empty: return [] g = ( f.groupby("Product line").agg( rating=("Rating", "mean"), income=("Sales", "sum"), quantity=("Quantity", "sum"), ).reset_index() ) palette = ["#10b981", "#6366f1", "#eab308", "#ef4444", "#f59e0b", "#06b6d4"] return [ { "name": r["Product line"], "rating": _round(r["rating"], 2), "income": _round(r["income"], 2), "quantity": int(r["quantity"]), "fill": palette[i % len(palette)], } for i, (_, r) in enumerate(g.iterrows()) ] @app.get("/api/kpi/city-treemap") def city_treemap(customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(None, customer_type, start, end) if f.empty: return [] g = f.groupby("City")["Sales"].sum().sort_values(ascending=False).reset_index() palette = ["#10b981", "#6366f1", "#f59e0b", "#ef4444", "#06b6d4"] total = float(g["Sales"].sum()) or 1 return [ { "name": r["City"], "size": _round(r["Sales"]), "share": _round(r["Sales"] / total * 100, 1), "fill": palette[i % len(palette)], } for i, (_, r) in enumerate(g.iterrows()) ] @app.get("/api/kpi/spend-histogram") def spend_histogram(branch: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, None, start, end) if f.empty: return [{"bin": l, "member": 0, "normal": 0} for l in ["$0-100", "$100-250", "$250-500", "$500-750", "$750-1k", "$1k+"]] bins = [0, 100, 250, 500, 750, 1000, float("inf")] labels = ["$0-100", "$100-250", "$250-500", "$500-750", "$750-1k", "$1k+"] f = f.assign(_bin=pd.cut(f["Sales"], bins=bins, labels=labels, include_lowest=True)) pivot = f.groupby(["_bin", "Customer type"], observed=False)["Invoice ID"].count().unstack(fill_value=0) return [ { "bin": str(l), "member": int(pivot.loc[l].get("Member", 0)) if l in pivot.index else 0, "normal": int(pivot.loc[l].get("Normal", 0)) if l in pivot.index else 0, } for l in labels ] @app.get("/api/kpi/velocity-trend") def velocity_trend(branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, customer_type, start, end) if f.empty: return [] daily = f.groupby(f["Date"].dt.date)["Sales"].sum().reset_index().sort_values("Date") daily["change"] = daily["Sales"].pct_change() * 100 daily = daily.dropna() daily = daily.tail(14) return [ { "name": pd.Timestamp(r["Date"]).strftime("%b %d"), "change": _round(r["change"], 2), "value": _round(r["Sales"]), } for _, r in daily.iterrows() ] @app.get("/api/kpi/insights") def insights(branch: Optional[str] = None, customer_type: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None): f = filter_df(branch, customer_type, start, end) if f.empty: return [] out: List[Dict[str, Any]] = [] branch_rev = f.groupby("Branch")["Sales"].sum().sort_values(ascending=False) if not branch_rev.empty: top_b = branch_rev.index[0] share = float(branch_rev.iloc[0] / branch_rev.sum() * 100) if branch_rev.sum() else 0 out.append({ "icon": "trophy", "title": f"Cabang {top_b} memimpin", "value": f"{share:.1f}%", "tone": "success", "description": f"Kontribusi pendapatan tertinggi sebesar ${branch_rev.iloc[0]:,.0f}.", }) dow_rev = f.groupby("DayOfWeek")["Sales"].sum() if not dow_rev.empty: top_d = dow_rev.idxmax() share = float(dow_rev.loc[top_d] / dow_rev.sum() * 100) if dow_rev.sum() else 0 out.append({ "icon": "calendar", "title": f"Hari Emas: {top_d}", "value": f"${dow_rev.loc[top_d]:,.0f}", "tone": "info", "description": f"Hari dengan akumulasi pendapatan terbesar ({share:.1f}% dari total).", }) prod_rev = f.groupby("Product line")["Sales"].sum().sort_values(ascending=False) if not prod_rev.empty: top_p = prod_rev.index[0] share = float(prod_rev.iloc[0] / prod_rev.sum() * 100) if prod_rev.sum() else 0 out.append({ "icon": "package", "title": top_p, "value": f"{share:.1f}%", "tone": "warning", "description": f"Kategori produk paling laris (${prod_rev.iloc[0]:,.0f}).", }) pay = f["Payment"].value_counts(normalize=True) * 100 if not pay.empty: out.append({ "icon": "wallet", "title": pay.index[0], "value": f"{pay.iloc[0]:.1f}%", "tone": "info", "description": "Metode pembayaran paling sering dipakai pelanggan.", }) if "Member" in f["Customer type"].values: member_rev = float(f[f["Customer type"] == "Member"]["Sales"].sum()) total = float(f["Sales"].sum()) or 1 share = member_rev / total * 100 out.append({ "icon": "users", "title": "Pangsa Member", "value": f"{share:.1f}%", "tone": "success", "description": f"Pelanggan member menyumbang ${member_rev:,.0f} dari total pendapatan.", }) rating = float(f["Rating"].mean()) out.append({ "icon": "star", "title": "Avg Rating", "value": f"{rating:.2f}/10", "tone": "info" if rating >= 7 else "warning", "description": f"Skor rata-rata kepuasan pelanggan untuk {len(f):,} transaksi.", }) return out # --------------------------------------------------------------------------- # AI BizConsultant chat # --------------------------------------------------------------------------- class ChatHistoryItem(BaseModel): role: str = Field(..., pattern="^(user|assistant)$") content: str class ChatRequest(BaseModel): question: str history: List[ChatHistoryItem] = Field(default_factory=list) SYSTEM_PROMPT = ( "You are BiteSight AI BizConsultant - a world-class F&B Executive Business Consultant " "and Data Scientist with 20+ years of experience advising top restaurant chains and " "retail F&B brands. You have deep expertise in sales analytics, consumer behavior, " "operational efficiency, and growth strategy.\n\n" "RESPONSE STYLE:\n" "- If the user sends a general greeting, respond warmly and naturally in Indonesian.\n" "- If the user asks a business/data question, give a THOROUGH, INSIGHTFUL, EXECUTIVE-LEVEL analysis. " "Do NOT give shallow or overly brief answers. Go deep.\n" "- ALWAYS include: key data facts WITH specific numbers, analytical commentary explaining the 'why', " "patterns / anomalies / trends, business implications, AND concrete actionable recommendations.\n" "- Structure your response FREELY using rich Markdown: ## headers, **bold**, bullet lists, " "and comparison tables when helpful. DO NOT use a rigid fixed template.\n" "- ALWAYS respond in fluent, professional INDONESIAN.\n" "- When user follows up referring to 'itu', 'tadi', 'hasil sebelumnya', use the chat history below to resolve context.\n\n" "PANDAS DATA ENGINE RULES (CRITICAL):\n" "- A pandas DataFrame named `df` with ALL rows of client transaction data is ALREADY LOADED.\n" "- Columns: Invoice ID, Branch (Alex/Giza/Cairo), City (Yangon/Naypyitaw/Mandalay), Customer type (Member/Normal), " "Gender, Product line, Unit price, Quantity, Tax 5%, Sales, Date (datetime), Time, Payment, cogs, " "gross margin percentage, gross income, Rating, Hour, DayOfWeek.\n" "- ALWAYS run actual pandas operations to get real numbers. NEVER fabricate data.\n" "- DO NOT recreate the dataframe via StringIO or hardcoded text. Use `df` directly.\n" "- Run MULTIPLE queries when needed (value_counts, groupby, mean, corr, describe, resample).\n" "- Cross-reference dimensions when relevant (payment vs branch vs hour, etc.).\n\n" "DATA PRESENTATION RULES:\n" "- ALWAYS state specific numbers, percentages, timeframes. Avoid vague statements.\n" "- Currency is US Dollars ($). Format as $XX.XX. NEVER use 'Rp' or 'Rupiah'.\n" "- Do NOT explain dataset schema or row counts unless explicitly asked.\n" "- When comparing metrics, give absolute numbers AND percentages.\n" ) def _format_history(history: List[ChatHistoryItem]) -> str: if not history: return "" # Keep only the last 6 turns to limit token cost trimmed = history[-6:] lines = [] for item in trimmed: speaker = "User" if item.role == "user" else "Assistant" # Trim content per turn to avoid runaway prompts content = item.content.strip() if len(content) > 1200: content = content[:1200] + "..." lines.append(f"{speaker}: {content}") return "\n".join(lines) @app.post("/chat") async def chat(request: ChatRequest): if df.empty: return {"answer": "Maaf, sumber data tidak tersedia di server. Silakan hubungi admin."} question = (request.question or "").strip() if not question: return {"answer": "Silakan ketik pertanyaan terlebih dahulu."} akashml_api_key = os.environ.get("AKASHML_API_KEY") if not akashml_api_key: return { "answer": "**Konfigurasi Belum Lengkap**\n\nVariabel `AKASHML_API_KEY` belum diset di Hugging Face Space. " "Silakan tambahkan di Space Settings -> Variables and secrets.", } try: from langchain_openai import ChatOpenAI from langchain_experimental.agents import create_pandas_dataframe_agent llm = ChatOpenAI( openai_api_key=akashml_api_key, openai_api_base="https://api.akashml.com/v1", model_name="MiniMaxAI/MiniMax-M2.5", temperature=0.2, max_tokens=8192, timeout=90, ) history_text = _format_history(request.history) if history_text: user_input = ( "Riwayat percakapan sebelumnya (gunakan untuk menjaga konteks):\n" f"{history_text}\n\n" f"Pertanyaan saat ini: {question}" ) else: user_input = question agent = create_pandas_dataframe_agent( llm, df, agent_type="tool-calling", allow_dangerous_code=True, handle_parsing_errors=True, verbose=False, prefix=SYSTEM_PROMPT, ) response = agent.invoke({"input": user_input}) return {"answer": response.get("output", "Maaf, model tidak menghasilkan jawaban.")} except Exception as exc: log.error("Chat handler failed: %s\n%s", exc, traceback.format_exc()) msg = str(exc) # Surface only the cleaned-up final answer if LLM produced raw text if "Could not parse" in msg: extracted = msg.split("Could not parse")[-1].replace("LLM output:", "").strip(" `'\"") return {"answer": extracted} return { "answer": ( "**Maaf, terjadi kendala saat memproses permintaan Anda.**\n\n" "Hal ini biasanya disebabkan oleh: koneksi ke model AI bermasalah, " "kuota habis, atau format jawaban model tidak sesuai. " "Coba ulangi pertanyaan dengan kalimat yang lebih singkat." ), } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860")))