Spaces:
Sleeping
Sleeping
| """BiteSight Analytics API. | |
| Static-CSV-backed analytics service that exposes: | |
| - Deterministic KPI endpoints under /api/* (used by the dashboard). | |
| - An AI BizConsultant /chat endpoint backed by a pandas LangChain agent. | |
| The CSV at DATA_FILE is loaded once at startup, normalized (datetime parsing, | |
| day-of-week, hour) and reused for every request. All KPI endpoints accept the | |
| same filter contract: branch / customer_type / start / end. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import traceback | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import FastAPI, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| log = logging.getLogger("bitesight") | |
| # --------------------------------------------------------------------------- | |
| # App + CORS | |
| # --------------------------------------------------------------------------- | |
| app = FastAPI(title="BiteSight Analytics API", version="2.0.0") | |
| # CORS: allow * for now (static dashboard hosted on Cloudflare Pages can have many | |
| # preview URLs). For tighter security, replace with an explicit whitelist via env. | |
| ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "*") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[o.strip() for o in ALLOWED_ORIGINS.split(",")] if ALLOWED_ORIGINS != "*" else ["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Data layer | |
| # --------------------------------------------------------------------------- | |
| DATA_FILE = os.environ.get("DATA_FILE", "data_client.csv") | |
| DOW_ORDER = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] | |
| DOW_SHORT = { | |
| "Monday": "Mon", "Tuesday": "Tue", "Wednesday": "Wed", "Thursday": "Thu", | |
| "Friday": "Fri", "Saturday": "Sat", "Sunday": "Sun", | |
| } | |
| def _load_df() -> pd.DataFrame: | |
| raw = pd.read_csv(DATA_FILE) | |
| raw["Date"] = pd.to_datetime(raw["Date"], format="%m/%d/%Y", errors="coerce") | |
| # Time is like "1:08:00 PM" | |
| raw["Time_parsed"] = pd.to_datetime(raw["Time"], format="%I:%M:%S %p", errors="coerce") | |
| raw["Hour"] = raw["Time_parsed"].dt.hour | |
| raw["DayOfWeek"] = raw["Date"].dt.day_name() | |
| raw["MonthLabel"] = raw["Date"].dt.strftime("%b %Y") | |
| return raw | |
| try: | |
| df = _load_df() | |
| log.info("Loaded %s with %d rows (range %s -> %s)", DATA_FILE, len(df), df["Date"].min(), df["Date"].max()) | |
| except Exception as exc: # pragma: no cover - startup failure | |
| log.exception("Could not load %s: %s", DATA_FILE, exc) | |
| df = pd.DataFrame() | |
| def _parse_date(value: Optional[str]) -> Optional[pd.Timestamp]: | |
| if not value: | |
| return None | |
| try: | |
| return pd.to_datetime(value) | |
| except Exception: | |
| return None | |
| def filter_df( | |
| branch: Optional[str] = None, | |
| customer_type: Optional[str] = None, | |
| start: Optional[str] = None, | |
| end: Optional[str] = None, | |
| ) -> pd.DataFrame: | |
| """Apply the standard filter contract used by every KPI endpoint.""" | |
| if df.empty: | |
| return df | |
| f = df | |
| if branch and branch.lower() != "all": | |
| f = f[f["Branch"].str.lower() == branch.lower()] | |
| if customer_type and customer_type.lower() != "all": | |
| f = f[f["Customer type"].str.lower() == customer_type.lower()] | |
| s = _parse_date(start) | |
| e = _parse_date(end) | |
| if s is not None: | |
| f = f[f["Date"] >= s] | |
| if e is not None: | |
| # inclusive end-of-day | |
| f = f[f["Date"] <= e + pd.Timedelta(days=1) - pd.Timedelta(seconds=1)] | |
| return f | |
| def _safe_pct(curr: float, prev: float) -> Optional[float]: | |
| if prev is None or prev == 0 or np.isnan(prev): | |
| return None | |
| return float(((curr - prev) / prev) * 100.0) | |
| def _split_halves(f: pd.DataFrame, agg) -> tuple[float, Optional[float]]: | |
| """Compute a metric for the second half and first half of the period for trend deltas.""" | |
| if f.empty: | |
| return 0.0, None | |
| if f["Date"].nunique() < 2: | |
| return float(agg(f)), None | |
| midpoint = f["Date"].min() + (f["Date"].max() - f["Date"].min()) / 2 | |
| first = f[f["Date"] < midpoint] | |
| second = f[f["Date"] >= midpoint] | |
| return float(agg(second)), (float(agg(first)) if not first.empty else None) | |
| def _round(v: Any, n: int = 2) -> float: | |
| try: | |
| if v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v))): | |
| return 0.0 | |
| return round(float(v), n) | |
| except Exception: | |
| return 0.0 | |
| # --------------------------------------------------------------------------- | |
| # Health & metadata | |
| # --------------------------------------------------------------------------- | |
| def root(): | |
| return {"service": "BiteSight Analytics API", "version": "2.0.0", "rows": len(df)} | |
| def health(): | |
| return { | |
| "status": "ok" if not df.empty else "degraded", | |
| "rows": int(len(df)), | |
| "columns": list(df.columns) if not df.empty else [], | |
| } | |
| def get_meta(): | |
| if df.empty: | |
| return {"error": "data unavailable"} | |
| branches = sorted(df["Branch"].dropna().unique().tolist()) | |
| cities_map = ( | |
| df.dropna(subset=["Branch", "City"]).drop_duplicates("Branch").set_index("Branch")["City"].to_dict() | |
| ) | |
| return { | |
| "branches": branches, | |
| "cities": cities_map, | |
| "products": sorted(df["Product line"].dropna().unique().tolist()), | |
| "payments": sorted(df["Payment"].dropna().unique().tolist()), | |
| "customer_types": sorted(df["Customer type"].dropna().unique().tolist()), | |
| "genders": sorted(df["Gender"].dropna().unique().tolist()), | |
| "date_range": { | |
| "min": df["Date"].min().strftime("%Y-%m-%d"), | |
| "max": df["Date"].max().strftime("%Y-%m-%d"), | |
| }, | |
| "total_rows": int(len(df)), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # KPI endpoints | |
| # --------------------------------------------------------------------------- | |
| def _common_filters( | |
| branch: Optional[str] = Query(None), | |
| customer_type: Optional[str] = Query(None), | |
| start: Optional[str] = Query(None), | |
| end: Optional[str] = Query(None), | |
| ): | |
| return {"branch": branch, "customer_type": customer_type, "start": start, "end": end} | |
| def kpi_summary( | |
| branch: Optional[str] = None, | |
| customer_type: Optional[str] = None, | |
| start: Optional[str] = None, | |
| end: Optional[str] = None, | |
| ): | |
| f = filter_df(branch, customer_type, start, end) | |
| if f.empty: | |
| return {"empty": True} | |
| rev_curr, rev_prev = _split_halves(f, lambda x: x["Sales"].sum()) | |
| txn_curr, txn_prev = _split_halves(f, lambda x: len(x)) | |
| basket_curr, basket_prev = _split_halves( | |
| f, lambda x: x["Sales"].mean() if len(x) else 0 | |
| ) | |
| rating_curr, rating_prev = _split_halves( | |
| f, lambda x: x["Rating"].mean() if len(x) else 0 | |
| ) | |
| margin_curr, margin_prev = _split_halves( | |
| f, lambda x: x["gross margin percentage"].mean() if len(x) else 0 | |
| ) | |
| daily = ( | |
| f.groupby(f["Date"].dt.date)["Sales"].sum().reset_index().sort_values("Date") | |
| ) | |
| sparkline = [ | |
| {"date": pd.Timestamp(r["Date"]).strftime("%Y-%m-%d"), "value": _round(r["Sales"])} | |
| for _, r in daily.iterrows() | |
| ] | |
| return { | |
| "period": { | |
| "start": f["Date"].min().strftime("%Y-%m-%d"), | |
| "end": f["Date"].max().strftime("%Y-%m-%d"), | |
| "days": int((f["Date"].max() - f["Date"].min()).days + 1), | |
| }, | |
| "revenue": {"value": _round(f["Sales"].sum()), "delta_pct": _round(_safe_pct(rev_curr, rev_prev) or 0)}, | |
| "transactions": {"value": int(len(f)), "delta_pct": _round(_safe_pct(txn_curr, txn_prev) or 0)}, | |
| "avg_basket": {"value": _round(f["Sales"].mean()), "delta_pct": _round(_safe_pct(basket_curr, basket_prev) or 0)}, | |
| "rating": {"value": _round(f["Rating"].mean()), "delta_pct": _round(_safe_pct(rating_curr, rating_prev) or 0)}, | |
| "margin": { | |
| "value": _round(f["gross margin percentage"].mean()), | |
| "delta_pct": _round(_safe_pct(margin_curr, margin_prev) or 0), | |
| "target": 5.0, | |
| }, | |
| "sparkline": sparkline, | |
| } | |
| def sales_by_day(branch: Optional[str] = None, customer_type: Optional[str] = None, | |
| start: Optional[str] = None, end: Optional[str] = None): | |
| f = filter_df(branch, customer_type, start, end) | |
| if f.empty: | |
| return [{"name": DOW_SHORT[d], "member": 0, "normal": 0} for d in DOW_ORDER] | |
| pivot = ( | |
| f.groupby(["DayOfWeek", "Customer type"])["Sales"].sum().unstack(fill_value=0) | |
| .reindex(DOW_ORDER, fill_value=0) | |
| ) | |
| out = [] | |
| for d in DOW_ORDER: | |
| out.append({ | |
| "name": DOW_SHORT[d], | |
| "member": _round(pivot.loc[d].get("Member", 0)) if d in pivot.index else 0, | |
| "normal": _round(pivot.loc[d].get("Normal", 0)) if d in pivot.index else 0, | |
| }) | |
| return out | |
| def heatmap(branch: Optional[str] = None, customer_type: Optional[str] = None, | |
| start: Optional[str] = None, end: Optional[str] = None): | |
| f = filter_df(branch, customer_type, start, end) | |
| pivot = ( | |
| f.groupby(["Product line", "DayOfWeek"])["Invoice ID"].count().unstack(fill_value=0) | |
| if not f.empty else pd.DataFrame() | |
| ) | |
| if not pivot.empty: | |
| pivot = pivot.reindex(columns=DOW_ORDER, fill_value=0) | |
| rows = [] | |
| for product in sorted(pivot.index) if not pivot.empty else sorted(df["Product line"].unique() if not df.empty else []): | |
| rows.append({ | |
| "name": product, | |
| "data": [int(pivot.loc[product, d]) if (not pivot.empty and product in pivot.index) else 0 for d in DOW_ORDER], | |
| }) | |
| max_val = int(pivot.values.max()) if not pivot.empty and pivot.size > 0 else 1 | |
| return {"rows": rows, "days": [DOW_SHORT[d] for d in DOW_ORDER], "max": max_val} | |
| def peak_hours(branch: Optional[str] = None, customer_type: Optional[str] = None, | |
| start: Optional[str] = None, end: Optional[str] = None): | |
| f = filter_df(branch, customer_type, start, end) | |
| if f.empty: | |
| return [] | |
| by_hour = ( | |
| f.dropna(subset=["Hour"]).groupby("Hour")["Invoice ID"].count().reset_index().sort_values("Hour") | |
| ) | |
| def _fmt(h: int) -> str: | |
| h = int(h) | |
| if h == 0: | |
| return "12am" | |
| if h < 12: | |
| return f"{h}am" | |
| if h == 12: | |
| return "12pm" | |
| return f"{h - 12}pm" | |
| return [ | |
| {"name": _fmt(int(r["Hour"])), "hour": int(r["Hour"]), "visitors": int(r["Invoice ID"])} | |
| for _, r in by_hour.iterrows() | |
| ] | |
| def payments(branch: Optional[str] = None, customer_type: Optional[str] = None, | |
| start: Optional[str] = None, end: Optional[str] = None): | |
| f = filter_df(branch, customer_type, start, end) | |
| if f.empty: | |
| return [{"name": DOW_SHORT[d], "Ewallet": 0, "Cash": 0, "Card": 0} for d in DOW_ORDER] | |
| pivot = ( | |
| f.groupby(["DayOfWeek", "Payment"])["Invoice ID"].count().unstack(fill_value=0) | |
| .reindex(DOW_ORDER, fill_value=0) | |
| ) | |
| out = [] | |
| for d in DOW_ORDER: | |
| row = pivot.loc[d] if d in pivot.index else pd.Series() | |
| out.append({ | |
| "name": DOW_SHORT[d], | |
| "Ewallet": int(row.get("Ewallet", 0)) if not row.empty else 0, | |
| "Cash": int(row.get("Cash", 0)) if not row.empty else 0, | |
| "Card": int(row.get("Credit card", 0)) if not row.empty else 0, | |
| }) | |
| return out | |
| def branch_radar(customer_type: Optional[str] = None, start: Optional[str] = None, | |
| end: Optional[str] = None): | |
| f = filter_df(None, customer_type, start, end) | |
| if f.empty: | |
| return {"data": [], "branches": []} | |
| branches = sorted(f["Branch"].unique().tolist()) | |
| metrics: Dict[str, Dict[str, float]] = {} | |
| for b in branches: | |
| sub = f[f["Branch"] == b] | |
| metrics[b] = { | |
| "Revenue": float(sub["Sales"].sum()), | |
| "Transactions": float(len(sub)), | |
| "Quantity": float(sub["Quantity"].sum()), | |
| "AvgTicket": float(sub["Sales"].mean() if len(sub) else 0), | |
| "Rating": float(sub["Rating"].mean() if len(sub) else 0), | |
| } | |
| dimensions = ["Revenue", "Transactions", "Quantity", "AvgTicket", "Rating"] | |
| data: List[Dict[str, Any]] = [] | |
| for dim in dimensions: | |
| max_val = max((metrics[b][dim] for b in branches), default=1) or 1 | |
| entry: Dict[str, Any] = {"subject": dim, "fullMark": 100} | |
| for b in branches: | |
| entry[b] = _round((metrics[b][dim] / max_val) * 100, 1) | |
| data.append(entry) | |
| return {"data": data, "branches": branches, "raw": metrics} | |
| def bubble_strategy(branch: Optional[str] = None, customer_type: Optional[str] = None, | |
| start: Optional[str] = None, end: Optional[str] = None): | |
| f = filter_df(branch, customer_type, start, end) | |
| if f.empty: | |
| return [] | |
| g = ( | |
| f.groupby("Product line").agg( | |
| rating=("Rating", "mean"), | |
| income=("Sales", "sum"), | |
| quantity=("Quantity", "sum"), | |
| ).reset_index() | |
| ) | |
| palette = ["#10b981", "#6366f1", "#eab308", "#ef4444", "#f59e0b", "#06b6d4"] | |
| return [ | |
| { | |
| "name": r["Product line"], | |
| "rating": _round(r["rating"], 2), | |
| "income": _round(r["income"], 2), | |
| "quantity": int(r["quantity"]), | |
| "fill": palette[i % len(palette)], | |
| } | |
| for i, (_, r) in enumerate(g.iterrows()) | |
| ] | |
| def city_treemap(customer_type: Optional[str] = None, start: Optional[str] = None, | |
| end: Optional[str] = None): | |
| f = filter_df(None, customer_type, start, end) | |
| if f.empty: | |
| return [] | |
| g = f.groupby("City")["Sales"].sum().sort_values(ascending=False).reset_index() | |
| palette = ["#10b981", "#6366f1", "#f59e0b", "#ef4444", "#06b6d4"] | |
| total = float(g["Sales"].sum()) or 1 | |
| return [ | |
| { | |
| "name": r["City"], | |
| "size": _round(r["Sales"]), | |
| "share": _round(r["Sales"] / total * 100, 1), | |
| "fill": palette[i % len(palette)], | |
| } | |
| for i, (_, r) in enumerate(g.iterrows()) | |
| ] | |
| def spend_histogram(branch: Optional[str] = None, start: Optional[str] = None, | |
| end: Optional[str] = None): | |
| f = filter_df(branch, None, start, end) | |
| if f.empty: | |
| return [{"bin": l, "member": 0, "normal": 0} for l in | |
| ["$0-100", "$100-250", "$250-500", "$500-750", "$750-1k", "$1k+"]] | |
| bins = [0, 100, 250, 500, 750, 1000, float("inf")] | |
| labels = ["$0-100", "$100-250", "$250-500", "$500-750", "$750-1k", "$1k+"] | |
| f = f.assign(_bin=pd.cut(f["Sales"], bins=bins, labels=labels, include_lowest=True)) | |
| pivot = f.groupby(["_bin", "Customer type"], observed=False)["Invoice ID"].count().unstack(fill_value=0) | |
| return [ | |
| { | |
| "bin": str(l), | |
| "member": int(pivot.loc[l].get("Member", 0)) if l in pivot.index else 0, | |
| "normal": int(pivot.loc[l].get("Normal", 0)) if l in pivot.index else 0, | |
| } | |
| for l in labels | |
| ] | |
| def velocity_trend(branch: Optional[str] = None, customer_type: Optional[str] = None, | |
| start: Optional[str] = None, end: Optional[str] = None): | |
| f = filter_df(branch, customer_type, start, end) | |
| if f.empty: | |
| return [] | |
| daily = f.groupby(f["Date"].dt.date)["Sales"].sum().reset_index().sort_values("Date") | |
| daily["change"] = daily["Sales"].pct_change() * 100 | |
| daily = daily.dropna() | |
| daily = daily.tail(14) | |
| return [ | |
| { | |
| "name": pd.Timestamp(r["Date"]).strftime("%b %d"), | |
| "change": _round(r["change"], 2), | |
| "value": _round(r["Sales"]), | |
| } | |
| for _, r in daily.iterrows() | |
| ] | |
| def insights(branch: Optional[str] = None, customer_type: Optional[str] = None, | |
| start: Optional[str] = None, end: Optional[str] = None): | |
| f = filter_df(branch, customer_type, start, end) | |
| if f.empty: | |
| return [] | |
| out: List[Dict[str, Any]] = [] | |
| branch_rev = f.groupby("Branch")["Sales"].sum().sort_values(ascending=False) | |
| if not branch_rev.empty: | |
| top_b = branch_rev.index[0] | |
| share = float(branch_rev.iloc[0] / branch_rev.sum() * 100) if branch_rev.sum() else 0 | |
| out.append({ | |
| "icon": "trophy", | |
| "title": f"Cabang {top_b} memimpin", | |
| "value": f"{share:.1f}%", | |
| "tone": "success", | |
| "description": f"Kontribusi pendapatan tertinggi sebesar ${branch_rev.iloc[0]:,.0f}.", | |
| }) | |
| dow_rev = f.groupby("DayOfWeek")["Sales"].sum() | |
| if not dow_rev.empty: | |
| top_d = dow_rev.idxmax() | |
| share = float(dow_rev.loc[top_d] / dow_rev.sum() * 100) if dow_rev.sum() else 0 | |
| out.append({ | |
| "icon": "calendar", | |
| "title": f"Hari Emas: {top_d}", | |
| "value": f"${dow_rev.loc[top_d]:,.0f}", | |
| "tone": "info", | |
| "description": f"Hari dengan akumulasi pendapatan terbesar ({share:.1f}% dari total).", | |
| }) | |
| prod_rev = f.groupby("Product line")["Sales"].sum().sort_values(ascending=False) | |
| if not prod_rev.empty: | |
| top_p = prod_rev.index[0] | |
| share = float(prod_rev.iloc[0] / prod_rev.sum() * 100) if prod_rev.sum() else 0 | |
| out.append({ | |
| "icon": "package", | |
| "title": top_p, | |
| "value": f"{share:.1f}%", | |
| "tone": "warning", | |
| "description": f"Kategori produk paling laris (${prod_rev.iloc[0]:,.0f}).", | |
| }) | |
| pay = f["Payment"].value_counts(normalize=True) * 100 | |
| if not pay.empty: | |
| out.append({ | |
| "icon": "wallet", | |
| "title": pay.index[0], | |
| "value": f"{pay.iloc[0]:.1f}%", | |
| "tone": "info", | |
| "description": "Metode pembayaran paling sering dipakai pelanggan.", | |
| }) | |
| if "Member" in f["Customer type"].values: | |
| member_rev = float(f[f["Customer type"] == "Member"]["Sales"].sum()) | |
| total = float(f["Sales"].sum()) or 1 | |
| share = member_rev / total * 100 | |
| out.append({ | |
| "icon": "users", | |
| "title": "Pangsa Member", | |
| "value": f"{share:.1f}%", | |
| "tone": "success", | |
| "description": f"Pelanggan member menyumbang ${member_rev:,.0f} dari total pendapatan.", | |
| }) | |
| rating = float(f["Rating"].mean()) | |
| out.append({ | |
| "icon": "star", | |
| "title": "Avg Rating", | |
| "value": f"{rating:.2f}/10", | |
| "tone": "info" if rating >= 7 else "warning", | |
| "description": f"Skor rata-rata kepuasan pelanggan untuk {len(f):,} transaksi.", | |
| }) | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # AI BizConsultant chat | |
| # --------------------------------------------------------------------------- | |
| class ChatHistoryItem(BaseModel): | |
| role: str = Field(..., pattern="^(user|assistant)$") | |
| content: str | |
| class ChatRequest(BaseModel): | |
| question: str | |
| history: List[ChatHistoryItem] = Field(default_factory=list) | |
| SYSTEM_PROMPT = ( | |
| "You are BiteSight AI BizConsultant - a world-class F&B Executive Business Consultant " | |
| "and Data Scientist with 20+ years of experience advising top restaurant chains and " | |
| "retail F&B brands. You have deep expertise in sales analytics, consumer behavior, " | |
| "operational efficiency, and growth strategy.\n\n" | |
| "RESPONSE STYLE:\n" | |
| "- If the user sends a general greeting, respond warmly and naturally in Indonesian.\n" | |
| "- If the user asks a business/data question, give a THOROUGH, INSIGHTFUL, EXECUTIVE-LEVEL analysis. " | |
| "Do NOT give shallow or overly brief answers. Go deep.\n" | |
| "- ALWAYS include: key data facts WITH specific numbers, analytical commentary explaining the 'why', " | |
| "patterns / anomalies / trends, business implications, AND concrete actionable recommendations.\n" | |
| "- Structure your response FREELY using rich Markdown: ## headers, **bold**, bullet lists, " | |
| "and comparison tables when helpful. DO NOT use a rigid fixed template.\n" | |
| "- ALWAYS respond in fluent, professional INDONESIAN.\n" | |
| "- When user follows up referring to 'itu', 'tadi', 'hasil sebelumnya', use the chat history below to resolve context.\n\n" | |
| "PANDAS DATA ENGINE RULES (CRITICAL):\n" | |
| "- A pandas DataFrame named `df` with ALL rows of client transaction data is ALREADY LOADED.\n" | |
| "- Columns: Invoice ID, Branch (Alex/Giza/Cairo), City (Yangon/Naypyitaw/Mandalay), Customer type (Member/Normal), " | |
| "Gender, Product line, Unit price, Quantity, Tax 5%, Sales, Date (datetime), Time, Payment, cogs, " | |
| "gross margin percentage, gross income, Rating, Hour, DayOfWeek.\n" | |
| "- ALWAYS run actual pandas operations to get real numbers. NEVER fabricate data.\n" | |
| "- DO NOT recreate the dataframe via StringIO or hardcoded text. Use `df` directly.\n" | |
| "- Run MULTIPLE queries when needed (value_counts, groupby, mean, corr, describe, resample).\n" | |
| "- Cross-reference dimensions when relevant (payment vs branch vs hour, etc.).\n\n" | |
| "DATA PRESENTATION RULES:\n" | |
| "- ALWAYS state specific numbers, percentages, timeframes. Avoid vague statements.\n" | |
| "- Currency is US Dollars ($). Format as $XX.XX. NEVER use 'Rp' or 'Rupiah'.\n" | |
| "- Do NOT explain dataset schema or row counts unless explicitly asked.\n" | |
| "- When comparing metrics, give absolute numbers AND percentages.\n" | |
| ) | |
| def _format_history(history: List[ChatHistoryItem]) -> str: | |
| if not history: | |
| return "" | |
| # Keep only the last 6 turns to limit token cost | |
| trimmed = history[-6:] | |
| lines = [] | |
| for item in trimmed: | |
| speaker = "User" if item.role == "user" else "Assistant" | |
| # Trim content per turn to avoid runaway prompts | |
| content = item.content.strip() | |
| if len(content) > 1200: | |
| content = content[:1200] + "..." | |
| lines.append(f"{speaker}: {content}") | |
| return "\n".join(lines) | |
| async def chat(request: ChatRequest): | |
| if df.empty: | |
| return {"answer": "Maaf, sumber data tidak tersedia di server. Silakan hubungi admin."} | |
| question = (request.question or "").strip() | |
| if not question: | |
| return {"answer": "Silakan ketik pertanyaan terlebih dahulu."} | |
| akashml_api_key = os.environ.get("AKASHML_API_KEY") | |
| if not akashml_api_key: | |
| return { | |
| "answer": "**Konfigurasi Belum Lengkap**\n\nVariabel `AKASHML_API_KEY` belum diset di Hugging Face Space. " | |
| "Silakan tambahkan di Space Settings -> Variables and secrets.", | |
| } | |
| try: | |
| from langchain_openai import ChatOpenAI | |
| from langchain_experimental.agents import create_pandas_dataframe_agent | |
| llm = ChatOpenAI( | |
| openai_api_key=akashml_api_key, | |
| openai_api_base="https://api.akashml.com/v1", | |
| model_name="MiniMaxAI/MiniMax-M2.5", | |
| temperature=0.2, | |
| max_tokens=8192, | |
| timeout=90, | |
| ) | |
| history_text = _format_history(request.history) | |
| if history_text: | |
| user_input = ( | |
| "Riwayat percakapan sebelumnya (gunakan untuk menjaga konteks):\n" | |
| f"{history_text}\n\n" | |
| f"Pertanyaan saat ini: {question}" | |
| ) | |
| else: | |
| user_input = question | |
| agent = create_pandas_dataframe_agent( | |
| llm, | |
| df, | |
| agent_type="tool-calling", | |
| allow_dangerous_code=True, | |
| handle_parsing_errors=True, | |
| verbose=False, | |
| prefix=SYSTEM_PROMPT, | |
| ) | |
| response = agent.invoke({"input": user_input}) | |
| return {"answer": response.get("output", "Maaf, model tidak menghasilkan jawaban.")} | |
| except Exception as exc: | |
| log.error("Chat handler failed: %s\n%s", exc, traceback.format_exc()) | |
| msg = str(exc) | |
| # Surface only the cleaned-up final answer if LLM produced raw text | |
| if "Could not parse" in msg: | |
| extracted = msg.split("Could not parse")[-1].replace("LLM output:", "").strip(" `'\"") | |
| return {"answer": extracted} | |
| return { | |
| "answer": ( | |
| "**Maaf, terjadi kendala saat memproses permintaan Anda.**\n\n" | |
| "Hal ini biasanya disebabkan oleh: koneksi ke model AI bermasalah, " | |
| "kuota habis, atau format jawaban model tidak sesuai. " | |
| "Coba ulangi pertanyaan dengan kalimat yang lebih singkat." | |
| ), | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860"))) | |