Spaces:
Sleeping
Sleeping
| """ | |
| main.py — Pricelyst Shopping Advisor (Analyst Edition - Full Context) | |
| ✅ Flask API | |
| ✅ Firebase Admin Persistence | |
| ✅ Gemini via google-genai SDK (Robust) | |
| ✅ "Analyst Engine": Python Math for Baskets, ZESA, & Fuel | |
| ✅ Ground Truth Data: Uses /api/v1/product-listing | |
| ✅ Jessica Context: Injects Top 60 Real Products into Voice Agent | |
| ✅ Intent Detection: Strict Casual vs Actionable separation | |
| ENV VARS: | |
| - GOOGLE_API_KEY=... | |
| - FIREBASE='{"type":"service_account", ...}' | |
| - PRICE_API_BASE=https://api.pricelyst.co.zw | |
| - GEMINI_MODEL=gemini-2.0-flash | |
| - PORT=5000 | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| import math | |
| import logging | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional | |
| import requests | |
| import pandas as pd | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| # ––––– Logging ––––– | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(message)s" | |
| ) | |
| logger = logging.getLogger("pricelyst-advisor") | |
| # ––––– Gemini SDK ––––– | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| except Exception as e: | |
| genai = None | |
| logger.error("google-genai not installed. pip install google-genai. Error=%s", e) | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.0-flash") | |
| _gemini_client = None | |
| if genai and GOOGLE_API_KEY: | |
| try: | |
| _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) | |
| except Exception as e: | |
| logger.error("Failed to init Gemini client: %s", e) | |
| # ––––– Firebase Admin ––––– | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| FIREBASE_ENV = os.environ.get("FIREBASE", "") | |
| def init_firestore_from_env() -> Optional[firestore.Client]: | |
| if firebase_admin._apps: | |
| return firestore.client() | |
| if not FIREBASE_ENV: | |
| logger.warning("FIREBASE env var missing. Persistence disabled.") | |
| return None | |
| try: | |
| sa_info = json.loads(FIREBASE_ENV) | |
| cred = credentials.Certificate(sa_info) | |
| firebase_admin.initialize_app(cred) | |
| logger.info("Firebase initialized.") | |
| return firestore.client() | |
| except Exception as e: | |
| logger.critical("Failed to initialize Firebase: %s", e) | |
| return None | |
| db = init_firestore_from_env() | |
| # ––––– External API ––––– | |
| PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/") | |
| HTTP_TIMEOUT = 30 | |
| # ––––– Static Data (Zim Context) ––––– | |
| ZIM_UTILITIES = { | |
| "fuel_petrol": 1.58, | |
| "fuel_diesel": 1.65, | |
| "gas_lpg": 2.00, | |
| "bread": 1.00, | |
| "zesa_step_1": {"limit": 50, "rate": 0.04}, | |
| "zesa_step_2": {"limit": 150, "rate": 0.09}, | |
| "zesa_step_3": {"limit": 9999, "rate": 0.14}, | |
| "zesa_levy": 0.06 | |
| } | |
| # ––––– Cache ––––– | |
| PRODUCT_CACHE_TTL = 60 * 20 # 20 mins | |
| _data_cache: Dict[str, Any] = { | |
| "ts": 0, | |
| "df": pd.DataFrame(), | |
| "raw_count": 0 | |
| } | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ========================= | |
| # 1. ETL Layer (Ingestion) | |
| # ========================= | |
| def _norm(s: Any) -> str: | |
| if not s: return "" | |
| return str(s).strip().lower() | |
| def _coerce_price(v: Any) -> float: | |
| try: | |
| return float(v) if v is not None else 0.0 | |
| except: | |
| return 0.0 | |
| def _safe_json_loads(s: str, fallback: Any): | |
| try: | |
| if "```json" in s: | |
| s = s.split("```json")[1].split("```")[0] | |
| elif "```" in s: | |
| s = s.split("```")[0] | |
| return json.loads(s) | |
| except Exception as e: | |
| logger.error(f"JSON Parse Error: {e}") | |
| return fallback | |
| def fetch_and_flatten_data() -> pd.DataFrame: | |
| """Fetches from /api/v1/product-listing and flattens into an analytical DF.""" | |
| all_products = [] | |
| page = 1 | |
| logger.info("ETL: Starting fetch from /api/v1/product-listing") | |
| while True: | |
| try: | |
| url = f"{PRICE_API_BASE}/api/v1/product-listing" | |
| r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT) | |
| r.raise_for_status() | |
| payload = r.json() | |
| data = payload.get("data") or [] | |
| if not data: break | |
| all_products.extend(data) | |
| meta = payload | |
| if page >= (meta.get("totalPages") or 99): | |
| break | |
| page += 1 | |
| except Exception as e: | |
| logger.error(f"ETL Error on page {page}: {e}") | |
| break | |
| # Flattening Logic | |
| rows = [] | |
| for p in all_products: | |
| try: | |
| p_id = p.get("id") | |
| p_name = p.get("name") or "Unknown" | |
| clean_name = _norm(p_name) | |
| cat_obj = p.get("category") or {} | |
| cat_name = cat_obj.get("name") or "General" | |
| brand_obj = p.get("brand") or {} | |
| brand_name = brand_obj.get("brand_name") or "" | |
| views = int(p.get("view_count") or 0) | |
| image = p.get("thumbnail") or p.get("image") | |
| prices = p.get("prices") or [] | |
| if not prices: | |
| rows.append({ | |
| "product_id": p_id, | |
| "product_name": p_name, | |
| "clean_name": clean_name, | |
| "brand": brand_name, | |
| "category": cat_name, | |
| "retailer": "Listing", | |
| "price": 0.0, | |
| "views": views, | |
| "image": image, | |
| "is_offer": False | |
| }) | |
| continue | |
| for offer in prices: | |
| retailer = offer.get("retailer") or {} | |
| r_name = retailer.get("name") or "Unknown Store" | |
| price_val = _coerce_price(offer.get("price")) | |
| if price_val > 0: | |
| rows.append({ | |
| "product_id": p_id, | |
| "product_name": p_name, | |
| "clean_name": clean_name, | |
| "brand": brand_name, | |
| "category": cat_name, | |
| "retailer": r_name, | |
| "price": price_val, | |
| "views": views, | |
| "image": image, | |
| "is_offer": True | |
| }) | |
| except: | |
| continue | |
| df = pd.DataFrame(rows) | |
| logger.info(f"ETL: Flattened into {len(df)} rows.") | |
| return df | |
| def get_market_index(force_refresh: bool = False) -> pd.DataFrame: | |
| global _data_cache | |
| if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL): | |
| logger.info("ETL: Refreshing Market Index...") | |
| df = fetch_and_flatten_data() | |
| _data_cache["df"] = df | |
| _data_cache["ts"] = time.time() | |
| _data_cache["raw_count"] = len(df) | |
| return _data_cache["df"] | |
| # ========================= | |
| # 2. Analyst Engine (Math Logic) | |
| # ========================= | |
| def search_products_fuzzy(df: pd.DataFrame, query: str, limit: int = 10) -> pd.DataFrame: | |
| if df.empty or not query: return df | |
| q_norm = _norm(query) | |
| # 1. Broad Filter (Contains) | |
| mask_name = df['clean_name'].str.contains(q_norm, regex=False) | |
| matches = df[mask_name].copy() | |
| # 2. If no exact contains, try token overlap | |
| if matches.empty: | |
| q_tokens = set(q_norm.split()) | |
| def token_score(text): | |
| if not isinstance(text, str): return 0 | |
| text_tokens = set(text.split()) | |
| if not text_tokens: return 0 | |
| intersection = q_tokens.intersection(text_tokens) | |
| return len(intersection) | |
| df_scored = df.copy() | |
| df_scored['score'] = df_scored['clean_name'].apply(token_score) | |
| matches = df_scored[df_scored['score'] > 0] | |
| if matches.empty: | |
| return matches | |
| # 3. Rank by Popularity (Views) + Price | |
| matches = matches.sort_values(by=['views', 'price'], ascending=[False, True]) | |
| return matches.head(limit) | |
| def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]: | |
| """ | |
| Determines the best store for a list of items. | |
| """ | |
| df = get_market_index() | |
| if df.empty: | |
| logger.warning("Basket Engine: DF is empty.") | |
| return {"actionable": False, "error": "No data"} | |
| logger.info(f"Basket Engine: Optimizing for {len(item_names)} items: {item_names}") | |
| found_items = [] | |
| missing_global = [] | |
| # 1. Resolve Items to Real Products | |
| for item in item_names: | |
| hits = search_products_fuzzy(df[df['is_offer']==True], item, limit=5) | |
| if hits.empty: | |
| missing_global.append(item) | |
| continue | |
| # Pick best match (First one is sorted by Views/Price) | |
| best_prod = hits.iloc[0] | |
| found_items.append({ | |
| "query": item, | |
| "product_id": best_prod['product_id'], | |
| "name": best_prod['product_name'] | |
| }) | |
| if not found_items: | |
| logger.info("Basket Engine: No items matched in DB.") | |
| return {"actionable": False, "missing": missing_global} | |
| # 2. Calculate Totals Per Retailer | |
| target_pids = [x['product_id'] for x in found_items] | |
| relevant_offers = df[df['product_id'].isin(target_pids) & df['is_offer']] | |
| retailer_stats = [] | |
| all_retailers = relevant_offers['retailer'].unique() | |
| for retailer in all_retailers: | |
| r_df = relevant_offers[relevant_offers['retailer'] == retailer] | |
| found_count = len(r_df) | |
| total_price = r_df['price'].sum() | |
| # Identify misses | |
| retailer_pids = r_df['product_id'].tolist() | |
| found_names = [x['name'] for x in found_items if x['product_id'] in retailer_pids] | |
| retailer_stats.append({ | |
| "retailer": retailer, | |
| "total_price": float(total_price), | |
| "item_count": found_count, | |
| "coverage_percent": (found_count / len(found_items)) * 100, | |
| "found_items": found_names | |
| }) | |
| # 3. Sort: Coverage Desc, Price Asc | |
| retailer_stats.sort(key=lambda x: (-x['coverage_percent'], x['total_price'])) | |
| if not retailer_stats: | |
| return {"actionable": False} | |
| best_option = retailer_stats[0] | |
| logger.info(f"Basket Engine: Best Store = {best_option['retailer']} (${best_option['total_price']})") | |
| return { | |
| "actionable": True, | |
| "basket_items": [x['name'] for x in found_items], | |
| "global_missing": missing_global, | |
| "best_store": best_option, | |
| "all_stores": retailer_stats[:3] | |
| } | |
| def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]: | |
| remaining = amount_usd / 1.06 # Remove 6% levy approx | |
| units = 0.0 | |
| breakdown = [] | |
| t1 = ZIM_UTILITIES["zesa_step_1"] | |
| cost_t1 = t1["limit"] * t1["rate"] | |
| if remaining > cost_t1: | |
| units += t1["limit"] | |
| remaining -= cost_t1 | |
| breakdown.append(f"First {t1['limit']}u @ ${t1['rate']}") | |
| t2 = ZIM_UTILITIES["zesa_step_2"] | |
| cost_t2 = t2["limit"] * t2["rate"] | |
| if remaining > cost_t2: | |
| units += t2["limit"] | |
| remaining -= cost_t2 | |
| breakdown.append(f"Next {t2['limit']}u @ ${t2['rate']}") | |
| t3 = ZIM_UTILITIES["zesa_step_3"] | |
| bought = remaining / t3["rate"] | |
| units += bought | |
| breakdown.append(f"Balance ${(remaining + cost_t1 + cost_t2):.2f} -> {bought:.1f}u @ ${t3['rate']}") | |
| else: | |
| bought = remaining / t2["rate"] | |
| units += bought | |
| breakdown.append(f"Balance -> {bought:.1f}u @ ${t2['rate']}") | |
| else: | |
| bought = remaining / t1["rate"] | |
| units += bought | |
| breakdown.append(f"All {bought:.1f}u @ ${t1['rate']}") | |
| return { | |
| "amount_usd": amount_usd, | |
| "est_units_kwh": round(units, 1), | |
| "breakdown": breakdown | |
| } | |
| # ========================= | |
| # 3. Gemini Helpers (Strict) | |
| # ========================= | |
| def gemini_detect_intent(transcript: str) -> Dict[str, Any]: | |
| """ | |
| Classifies if the conversation needs an Analyst action. | |
| """ | |
| if not _gemini_client: return {"actionable": False} | |
| PROMPT = """ | |
| Analyze this transcript. Return STRICT JSON. | |
| Is the user asking for shopping help (prices, basket, store advice, ZESA/Fuel)? | |
| Output Schema: | |
| { | |
| "actionable": boolean, | |
| "intent": "SHOPPING_BASKET" | "UTILITY_CALC" | "PRODUCT_SEARCH" | "CASUAL_CHAT", | |
| "items": ["item1", "item2"] (if applicable), | |
| "utility_amount": number (if applicable for ZESA/Fuel) | |
| } | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT + "\nTranscript: " + transcript, | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT"}) | |
| except Exception as e: | |
| logger.error(f"Intent Detect Error: {e}") | |
| return {"actionable": False, "intent": "CASUAL_CHAT"} | |
| def gemini_chat_response(transcript: str, analyst_data: Dict) -> str: | |
| if not _gemini_client: return "System offline." | |
| PROMPT = f""" | |
| You are Jessica, Pricelyst Analyst. | |
| User asked: "{transcript}" | |
| DATA (Use this strictly): | |
| {json.dumps(analyst_data, indent=2)} | |
| If 'actionable' is true, summarize the Best Store and Total Cost. | |
| If ZESA data is present, give the units estimate. | |
| Keep it short, helpful, and Zimbabwean. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT | |
| ) | |
| return resp.text | |
| except: | |
| return "I have the data but couldn't summarize it." | |
| # ========================= | |
| # 4. Endpoints | |
| # ========================= | |
| def health(): | |
| df = get_market_index() | |
| return jsonify({ | |
| "ok": True, | |
| "offers_indexed": len(df), | |
| "api_source": PRICE_API_BASE | |
| }) | |
| def chat(): | |
| """Text Chat Interface.""" | |
| body = request.get_json(silent=True) or {} | |
| msg = body.get("message", "") | |
| pid = body.get("profile_id") | |
| if not pid: return jsonify({"ok": False}), 400 | |
| # 1. Detect Intent | |
| intent_data = gemini_detect_intent(msg) | |
| analyst_data = {} | |
| # 2. Run Analyst (if actionable) | |
| if intent_data.get("actionable"): | |
| if intent_data["intent"] == "SHOPPING_BASKET" and intent_data.get("items"): | |
| analyst_data = calculate_basket_optimization(intent_data["items"]) | |
| elif intent_data["intent"] == "UTILITY_CALC": | |
| analyst_data = calculate_zesa_units(intent_data.get("utility_amount", 20)) | |
| elif intent_data["intent"] == "PRODUCT_SEARCH" and intent_data.get("items"): | |
| # Reuse basket logic for single item search to get best store | |
| analyst_data = calculate_basket_optimization(intent_data["items"]) | |
| # 3. Generate Reply | |
| reply = gemini_chat_response(msg, analyst_data) | |
| # Log | |
| if db: | |
| db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({ | |
| "message": msg, | |
| "response_text": reply, | |
| "intent": intent_data, | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| return jsonify({"ok": True, "data": {"message": reply, "analyst": analyst_data}}) | |
| def call_briefing(): | |
| """ | |
| Injects Memory + Top Products Catalogue for the Voice Agent. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| pid = body.get("profile_id") | |
| username = body.get("username") | |
| if not pid: return jsonify({"ok": False}), 400 | |
| prof = {} | |
| if db: | |
| ref = db.collection("pricelyst_profiles").document(pid) | |
| doc = ref.get() | |
| if doc.exists: | |
| prof = doc.to_dict() | |
| else: | |
| ref.set({"created_at": datetime.now(timezone.utc).isoformat()}) | |
| if username and username != prof.get("username"): | |
| if db: db.collection("pricelyst_profiles").document(pid).set({"username": username}, merge=True) | |
| # --- Generate Mini-Catalogue (Top 60 popular items) --- | |
| df = get_market_index() | |
| top_products_str = "" | |
| if not df.empty: | |
| # Sort by views desc, take top 60 unique product names | |
| top_offers = df[df['is_offer']].sort_values('views', ascending=False).drop_duplicates('product_name').head(60) | |
| # Format: "Name ($AvgPrice)" | |
| items_list = [] | |
| for _, r in top_offers.iterrows(): | |
| items_list.append(f"{r['product_name']} (~${r['price']:.2f})") | |
| top_products_str = ", ".join(items_list) | |
| # Payload for ElevenLabs (Data Variables Only) | |
| kpi_snapshot = { | |
| "market_rates": ZIM_UTILITIES, | |
| "popular_products_catalogue": top_products_str | |
| } | |
| return jsonify({ | |
| "ok": True, | |
| "memory_summary": prof.get("memory_summary", ""), | |
| "kpi_snapshot": json.dumps(kpi_snapshot) | |
| }) | |
| def log_call_usage(): | |
| """ | |
| Post-Call Processor. | |
| 1. Intent Check (Strict). | |
| 2. Analyst Optimization. | |
| 3. Plan Gen & Persistence. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| pid = body.get("profile_id") | |
| transcript = body.get("transcript", "") | |
| if not pid: return jsonify({"ok": False}), 400 | |
| logger.info(f"Log Call: Processing {pid}. Transcript Len: {len(transcript)}") | |
| # 1. Update Memory (Async-like) | |
| if len(transcript) > 20 and db: | |
| try: | |
| curr_mem = db.collection("pricelyst_profiles").document(pid).get().to_dict().get("memory_summary", "") | |
| mem_prompt = f"Update user memory (concise) with new details:\nOLD: {curr_mem}\nTRANSCRIPT: {transcript}" | |
| mem_resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=mem_prompt) | |
| db.collection("pricelyst_profiles").document(pid).set({"memory_summary": mem_resp.text}, merge=True) | |
| except Exception as e: | |
| logger.error(f"Memory Update Failed: {e}") | |
| # 2. Intent Detection (The Gatekeeper) | |
| intent_data = gemini_detect_intent(transcript) | |
| logger.info(f"Log Call: Intent detected: {intent_data.get('intent')}") | |
| plan_data = {} | |
| # 3. Actionable Logic | |
| if intent_data.get("actionable"): | |
| # Handle Shopping List | |
| if intent_data.get("items"): | |
| analyst_result = calculate_basket_optimization(intent_data["items"]) | |
| if analyst_result.get("actionable"): | |
| best = analyst_result["best_store"] | |
| # Markdown Generation | |
| md = f"# Shopping Plan\n\n" | |
| md += f"**Recommended Store:** {best['retailer']}\n" | |
| md += f"**Estimated Total:** ${best['total_price']:.2f}\n\n" | |
| md += "## Your Basket\n\n" | |
| md += "| Item | Found? |\n|---|---|\n" | |
| for it in analyst_result["basket_items"]: | |
| status = "✅ In Stock" if it in best["found_items"] else "❌ Not Found" | |
| md += f"| {it} | {status} |\n" | |
| if analyst_result["global_missing"]: | |
| md += "\n### Missing Items (Estimate Required)\n" | |
| for m in analyst_result["global_missing"]: | |
| md += f"- {m}\n" | |
| plan_data = { | |
| "is_actionable": True, | |
| "title": f"Plan: {best['retailer']} (${best['total_price']:.2f})", | |
| "markdown_content": md, | |
| "items": intent_data["items"], | |
| "created_at": datetime.now(timezone.utc).isoformat() | |
| } | |
| # Persist Plan | |
| if db: | |
| doc_ref = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document() | |
| plan_data["id"] = doc_ref.id | |
| doc_ref.set(plan_data) | |
| logger.info(f"Log Call: Plan Saved {doc_ref.id}") | |
| # 4. Log Call | |
| if db: | |
| db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({ | |
| "transcript": transcript, | |
| "intent_data": intent_data, | |
| "plan_generated": bool(plan_data), | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| return jsonify({ | |
| "ok": True, | |
| "shopping_plan": plan_data if plan_data.get("is_actionable") else None | |
| }) | |
| # ––––– CRUD: Shopping Plans ––––– | |
| def list_plans(): | |
| pid = request.args.get("profile_id") | |
| if not pid or not db: return jsonify({"ok": False}), 400 | |
| try: | |
| docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \ | |
| .order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream() | |
| plans = [{"id": d.id, **d.to_dict()} for d in docs] | |
| return jsonify({"ok": True, "plans": plans}) | |
| except: | |
| return jsonify({"ok": False}), 500 | |
| def delete_plan(plan_id): | |
| pid = request.args.get("profile_id") | |
| if not pid or not db: return jsonify({"ok": False}), 400 | |
| try: | |
| db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document(plan_id).delete() | |
| return jsonify({"ok": True}) | |
| except: | |
| return jsonify({"ok": False}), 500 | |
| # ========================= | |
| # Main | |
| # ========================= | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| # Pre-warm Cache | |
| try: | |
| get_market_index(force_refresh=True) | |
| except: | |
| pass | |
| app.run(host="0.0.0.0", port=port) |