""" utility.py — Pricelyst WhatsApp Bot Core AI & Data layer: - ETL from Pricelyst API -> in-memory Pandas market index - Deep vector search & basket optimisation (Market Matrix) - Gemini 2.5 Flash (new google-genai SDK) for intent, vision, chat - Vernacular engine: Shona / Ndebele / English in -> native reply out - Catalogue PDF generation (reportlab) - Firebase persistence (profiles, chat history, shopping plans) - ZESA electricity unit calculator """ import os import re import json import time import math import uuid import logging import base64 import io from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Optional, Tuple import requests import pandas as pd logger = logging.getLogger(__name__) # ───────────────────────────────────────────── # 1. Gemini (new google-genai SDK) # ───────────────────────────────────────────── try: from google import genai from google.genai import types as genai_types except ImportError: genai = None genai_types = None logger.error("google-genai not installed. Run: pip install google-genai") GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") _gemini_client = None if genai and GOOGLE_API_KEY: try: _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) except Exception as e: logger.error("Failed to init Gemini client: %s", e) # ───────────────────────────────────────────── # 2. Firebase # ───────────────────────────────────────────── import firebase_admin from firebase_admin import credentials, firestore, storage as fb_storage FIREBASE_ENV = os.environ.get("FIREBASE", "") FIREBASE_STORAGE_BUCKET = os.environ.get("FIREBASE_STORAGE_BUCKET", "") db: Optional[Any] = None def init_firestore_from_env() -> Optional[Any]: global db try: if firebase_admin._apps: db = firestore.client() return db if not FIREBASE_ENV: logger.warning("FIREBASE env var missing. Persistence disabled.") return None sa_info = json.loads(FIREBASE_ENV) cred = credentials.Certificate(sa_info) init_opts = {} if FIREBASE_STORAGE_BUCKET: init_opts["storageBucket"] = FIREBASE_STORAGE_BUCKET firebase_admin.initialize_app(cred, init_opts) db = firestore.client() logger.info("Firebase initialized.") return db except Exception as e: logger.critical("Failed to initialize Firebase: %s", e) return None db = init_firestore_from_env() # ───────────────────────────────────────────── # 3. Static Config # ───────────────────────────────────────────── PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/") HTTP_TIMEOUT = 30 PRODUCT_CACHE_TTL = 60 * 20 # 20 minutes ZIM_CONTEXT = { "fuel_petrol": 1.58, "fuel_diesel": 1.65, "gas_lpg": 2.00, "bread_avg": 1.10, "zesa_step_1": {"limit": 50, "rate": 0.04}, "zesa_step_2": {"limit": 150, "rate": 0.09}, "zesa_step_3": {"limit": 9999, "rate": 0.14}, "zesa_levy": 0.06 } IMGUR_CLIENT_ID = os.environ.get("IMGUR_CLIENT_ID", "") IMGUR_URL = "https://api.imgur.com/3/image" IMGUR_HEADERS = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {} DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "") DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" # Catalogue output dir CATALOGUE_DIR = os.path.join(os.getcwd(), "catalogues") os.makedirs(CATALOGUE_DIR, exist_ok=True) # ───────────────────────────────────────────── # 4. Market Index (ETL) # ───────────────────────────────────────────── _data_cache: Dict[str, Any] = {"ts": 0, "df": pd.DataFrame(), "raw_count": 0} def _norm(s: Any) -> str: if not s: return "" return str(s).strip().lower() def _coerce_price(v: Any) -> float: try: return float(v) if v is not None else 0.0 except Exception: return 0.0 def fetch_and_flatten_data() -> pd.DataFrame: all_products = [] page = 1 logger.info("ETL: Starting fetch from /api/v1/product-listing") while True: try: url = f"{PRICE_API_BASE}/api/v1/product-listing" r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT) r.raise_for_status() payload = r.json() data = payload.get("data") or [] if not data: break all_products.extend(data) if page >= (payload.get("totalPages") or 99): break page += 1 except Exception as e: logger.error(f"ETL Error on page {page}: {e}") break rows = [] for p in all_products: try: p_id = int(p.get("id") or 0) p_name = str(p.get("name") or "Unknown") brand_obj = p.get("brand") or {} brand_name = str(brand_obj.get("brand_name") or "") cats = p.get("categories") or [] cat_names = [str(c.get("name") or "") for c in cats] cat_str = " ".join(cat_names) primary_cat = cat_names[0] if cat_names else "General" search_vector = _norm(f"{p_name} {brand_name} {cat_str}") views = int(p.get("view_count") or 0) image = str(p.get("thumbnail") or p.get("image") or "") prices = p.get("prices") or [] if not prices: rows.append({ "product_id": p_id, "product_name": p_name, "search_vector": search_vector, "brand": brand_name, "category": primary_cat, "retailer": "Listing", "price": 0.0, "views": views, "image": image, "is_offer": False }) continue for offer in prices: retailer = offer.get("retailer") or {} r_name = str(retailer.get("name") or "Unknown Store") price_val = _coerce_price(offer.get("price")) if price_val > 0: rows.append({ "product_id": p_id, "product_name": p_name, "search_vector": search_vector, "brand": brand_name, "category": primary_cat, "retailer": r_name, "price": price_val, "views": views, "image": image, "is_offer": True }) except Exception: continue df = pd.DataFrame(rows) logger.info(f"ETL: Flattened into {len(df)} rows.") return df def get_market_index(force_refresh: bool = False) -> pd.DataFrame: global _data_cache if (force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL)): logger.info("ETL: Refreshing Market Index...") df = fetch_and_flatten_data() _data_cache.update({"df": df, "ts": time.time(), "raw_count": len(df)}) return _data_cache["df"] # ───────────────────────────────────────────── # 5. Precision Search & Basket Optimisation # ───────────────────────────────────────────── def search_products_deep(df: pd.DataFrame, query: str, limit: int = 15) -> pd.DataFrame: """ Precision Search Algorithm: 1. Exact Name Match (1000 pts) 2. Sequential vector match (500 pts) 3. Brand match (200 pts) 4. Token overlap (50 pts each) Tie-break: views desc, price asc """ if df.empty or not query: return df q_norm = _norm(query) q_tokens = set(q_norm.split()) def score(row): s = 0 vector = row["search_vector"] if q_norm == _norm(row["product_name"]): s += 1000 if q_norm in vector: s += 500 if row["brand"].lower() in q_norm: s += 200 overlap = len(q_tokens.intersection(set(vector.split()))) s += overlap * 50 return s df_scored = df.copy() df_scored["match_score"] = df_scored.apply(score, axis=1) matches = df_scored[df_scored["match_score"] > 0] if matches.empty: return matches return matches.sort_values( ["match_score", "views", "price"], ascending=[False, False, True] ).head(limit) def calculate_basket_optimization(item_names: List[str], preferred_retailer: Optional[str] = None) -> Dict[str, Any]: """Full market matrix with precision search, savings calculation & substitute flagging.""" df = get_market_index() if df.empty: return {"actionable": False, "error": "Market data unavailable. Please try again shortly."} found_items = [] missing_global = [] for item in item_names: hits = search_products_deep(df[df["is_offer"] == True], item, limit=10) if hits.empty: missing_global.append(item) continue best_match = hits.iloc[0] q_norm = _norm(item) res_norm = _norm(f"{best_match['product_name']} {best_match['brand']}") q_tokens = q_norm.split() is_sub = len(q_tokens) > 1 and sum(1 for t in q_tokens if t in res_norm) < len(q_tokens) product_offers = ( hits[hits["product_name"] == best_match["product_name"]] .sort_values("price") ) offers_list = [{"retailer": r["retailer"], "price": float(r["price"])} for _, r in product_offers.iterrows()] best_price = offers_list[0]["price"] max_price = offers_list[-1]["price"] found_items.append({ "query": item, "product_name": str(best_match["product_name"]), "brand": str(best_match["brand"]), "category": str(best_match["category"]), "image": str(best_match["image"]), "is_substitute": is_sub, "offers": offers_list, "best_price": best_price, "potential_savings": max_price - best_price, }) if not found_items: return {"actionable": True, "found_items": [], "global_missing": missing_global} # Market Matrix all_retailers = set() for f in found_items: for o in f["offers"]: all_retailers.add(o["retailer"]) store_comparison = [] for retailer in all_retailers: total_price = 0.0 found_count = 0 missing_list = [] for item in found_items: price = next((o["price"] for o in item["offers"] if o["retailer"] == retailer), None) if price: total_price += price found_count += 1 else: missing_list.append(item["product_name"]) store_comparison.append({ "retailer": retailer, "total_price": total_price, "found_count": found_count, "total_items": len(found_items), "missing_items": missing_list, }) store_comparison.sort(key=lambda x: (-x["found_count"], x["total_price"])) if len(store_comparison) > 1: max_total = max( s["total_price"] for s in store_comparison if s["found_count"] == store_comparison[0]["found_count"] ) for s in store_comparison: s["basket_savings"] = ( max_total - s["total_price"] if s["found_count"] == store_comparison[0]["found_count"] else 0.0 ) else: for s in store_comparison: s["basket_savings"] = 0.0 return { "actionable": True, "is_basket": len(found_items) > 1, "found_items": found_items, "global_missing": missing_global, "market_matrix": store_comparison[:5], "best_store": store_comparison[0] if store_comparison else None, "preferred_retailer": preferred_retailer, } # ───────────────────────────────────────────── # 6. ZESA Calculator # ───────────────────────────────────────────── def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]: remaining = amount_usd / (1 + ZIM_CONTEXT["zesa_levy"]) units = 0.0 t1 = ZIM_CONTEXT["zesa_step_1"] cost_t1 = t1["limit"] * t1["rate"] if remaining > cost_t1: units += t1["limit"] remaining -= cost_t1 t2 = ZIM_CONTEXT["zesa_step_2"] cost_t2 = t2["limit"] * t2["rate"] if remaining > cost_t2: units += t2["limit"] remaining -= cost_t2 units += remaining / ZIM_CONTEXT["zesa_step_3"]["rate"] else: units += remaining / t2["rate"] else: units += remaining / t1["rate"] return {"amount_usd": float(amount_usd), "est_units_kwh": float(round(units, 1))} # ───────────────────────────────────────────── # 7. Gemini Helpers (new SDK) # ───────────────────────────────────────────── def _safe_json_loads(s: str, fallback: Any) -> Any: try: cleaned = s if "```json" in cleaned: cleaned = cleaned.split("```json")[1].split("```")[0] elif "```" in cleaned: cleaned = cleaned.split("```")[1] return json.loads(cleaned.strip()) except Exception as e: logger.error(f"JSON parse error: {e} | raw: {s[:300]}") return fallback def gemini_detect_intent(transcript: str) -> Dict[str, Any]: """Classify intent + extract items/amounts from user message, supporting Shona/Ndebele/English.""" if not _gemini_client: return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} PROMPT = """ Analyze the transcript below and return STRICT JSON. Intents: - CASUAL_CHAT : Greetings, "hi", off-topic - SHOPPING_BASKET : Searching for prices / cheapest X - UTILITY_CALC : Electricity / ZESA / fuel cost questions - STORE_DECISION : "Which store is cheapest?", "Where should I shop?" - EVENT_PLANNING : Implicit lists — "plan a braai", "wedding grocery list", "dinner for 5" - CATALOGUE_REQUEST : User wants a PDF price list / catalogue / deals sheet - DEALS_EXPLORE : "Today's deals", "promotions", "what's on special" - DISCOVER : "What products do you have?", "show me your categories" Rules: - Extract items: translate ALL items to English (e.g. 'hupfu' → 'maize meal', 'mafuta' → 'cooking oil'). - If only a concept is given (e.g. "plan a braai"), set is_event_planning=true and items=[]. - Detect user language accurately (e.g. Shona, Ndebele, English). - store_preference: name of store if explicitly mentioned. - utility_amount: numeric value if mentioned. JSON Schema: { "actionable": boolean, "intent": "string", "items": ["string"], "utility_amount": number, "store_preference": "string | null", "is_event_planning": boolean, "language": "string", "catalogue_scope": "string | null" } Transcript: """ + transcript try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=PROMPT, config=genai_types.GenerateContentConfig(response_mime_type="application/json") ) return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []}) except Exception as e: logger.error(f"Intent detect error: {e}") return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} def gemini_explode_concept(transcript: str) -> List[str]: """Converts an event/meal concept into a concrete grocery list.""" if not _gemini_client: return [] PROMPT = f""" User wants to plan: "{transcript}" Generate 10-15 essential Zimbabwean grocery items for this. Use English terms suitable for database lookup (e.g. 'Maize Meal', 'Cooking Oil', 'Beef'). Return ONLY a JSON list of strings. """ try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=PROMPT, config=genai_types.GenerateContentConfig(response_mime_type="application/json") ) return _safe_json_loads(resp.text, []) except Exception as e: logger.error(f"Concept explode error: {e}") return [] def gemini_analyze_image(image_bytes: bytes, caption: str = "") -> Dict[str, Any]: """Analyse a WhatsApp image — grocery list photo, product, or meal.""" if not _gemini_client: return {"type": "IRRELEVANT", "items": [], "description": ""} PROMPT = f""" Analyze this image. Context caption: "{caption}" Classify: 1. SHOPPING_LIST → Extract each item as written (translate to English). 2. SINGLE_PRODUCT → Extract BRAND + NAME (e.g. "Pepsi 500ml", "Zimgold Cooking Oil 2L"). 3. MEAL_DISH → Identify dish name + core ingredients. 4. IRRELEVANT → Not shopping related. Return STRICT JSON: {{ "type": "SHOPPING_LIST" | "SINGLE_PRODUCT" | "MEAL_DISH" | "IRRELEVANT", "items": ["item1", "item2"], "description": "short description" }} """ try: image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=[PROMPT, image_part], config=genai_types.GenerateContentConfig(response_mime_type="application/json") ) result = _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": [], "description": ""}) return result except Exception as e: logger.error(f"Vision error: {e}") return {"type": "IRRELEVANT", "items": [], "description": ""} def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, chat_history: str = "", language: str = "English") -> str: """Generate April's WhatsApp reply, formatted for plain text (no markdown headers).""" if not _gemini_client: return "Hi! I'm April from Pricelyst. I'm having a bit of trouble right now — please try again shortly." context_str = "" if chat_history: context_str += f"RECENT CHAT:\n{chat_history}\n\n" context_str += ( f"ZIMBABWE CONTEXT: Petrol=${ZIM_CONTEXT['fuel_petrol']}/L, " f"Diesel=${ZIM_CONTEXT['fuel_diesel']}/L, " f"Bread≈${ZIM_CONTEXT['bread_avg']}, " f"ZESA: $10={calculate_zesa_units(10)['est_units_kwh']}u, " f"$20={calculate_zesa_units(20)['est_units_kwh']}u\n" ) if analyst_data: context_str += f"\nANALYST DATA:\n{json.dumps(analyst_data, default=str)}\n" PROMPT = f""" You are April, Pricelyst Zimbabwe's friendly WhatsApp Shopping Advisor 🛒. Your mission: shortest path to value + complete price transparency for Zimbabwean shoppers. INPUT: "{transcript}" USER LANGUAGE: {language} INTENT: {intent.get('intent', 'CASUAL_CHAT')} CONTEXT: {context_str} FORMATTING RULES (WhatsApp plain text — NO markdown headers like ##): - Use *bold* for store names and prices. - Use emojis naturally (✅ 🛒 💰 📍 ⚠️). - Keep replies concise. No walls of text. - If replying in Shona or Ndebele, ensure grammar is natural. LOGIC: 1. BASKET COMPARISON: Present market matrix. State cheapest store total and basket_savings clearly. Example: "✅ *OK Mart* has the best deal at *$4.00* total — saving you *$2.95* vs Spar!" 2. SUBSTITUTE (is_substitute=true): "I couldn't find *[Query]*, but the nearest match is *[Product]* at *$X*." 3. SINGLE ITEM: Show cheapest price + store, then alternatives. Note potential_savings if > $0.10. 4. ZESA: Calculate and explain units clearly. 5. CASUAL / GREETING: Be warm, introduce yourself briefly and invite a search query. 6. DEALS_EXPLORE: List 5-8 interesting current deals from analyst data. 7. EVENT_PLANNING: Acknowledge the plan, then present the shopping basket. 8. CATALOGUE_REQUEST: Inform the user their PDF catalogue is being prepared and will arrive shortly. Always end with a helpful follow-up question or CTA if appropriate. """ try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=PROMPT ) return resp.text.strip() except Exception as e: logger.error(f"Chat response error: {e}") return "I checked the prices, but I'm having trouble displaying them right now. Please try again!" def gemini_translate(text: str, target_lang: str) -> str: """Translate April's English response into the user's language if needed.""" if not _gemini_client or not target_lang or target_lang.lower() == "english": return text PROMPT = f""" Translate the following WhatsApp shopping assistant reply from English to {target_lang}. Rules: - Keep prices ($X.XX), store names, and product names UNCHANGED. - Keep WhatsApp formatting (*bold*, emojis) UNCHANGED. - Use natural, conversational tone. Text: "{text}" """ try: resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) return resp.text.strip() except Exception as e: logger.error(f"Translation error: {e}") return text # ───────────────────────────────────────────── # 8. Catalogue PDF Generator # ───────────────────────────────────────────── def generate_catalogue_pdf(title: str, items: List[Dict], scope_label: str = "Price Comparison") -> Optional[str]: """ Generate a professional Pricelyst price-comparison PDF using reportlab. Returns file path on success, None on failure. """ try: from reportlab.lib.pagesizes import A4 from reportlab.lib import colors from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import cm from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable) from reportlab.lib.enums import TA_CENTER, TA_LEFT filename = f"pricelyst_catalogue_{uuid.uuid4().hex[:8]}.pdf" filepath = os.path.join(CATALOGUE_DIR, filename) doc = SimpleDocTemplate(filepath, pagesize=A4, rightMargin=1.5*cm, leftMargin=1.5*cm, topMargin=1.5*cm, bottomMargin=1.5*cm) styles = getSampleStyleSheet() # Custom styles title_style = ParagraphStyle("CatTitle", parent=styles["Title"], fontSize=22, textColor=colors.HexColor("#1E7B34"), spaceAfter=4, alignment=TA_CENTER) sub_style = ParagraphStyle("CatSub", parent=styles["Normal"], fontSize=10, textColor=colors.HexColor("#555555"), alignment=TA_CENTER, spaceAfter=2) head_style = ParagraphStyle("ColHead", parent=styles["Normal"], fontSize=9, textColor=colors.white, fontName="Helvetica-Bold") cell_style = ParagraphStyle("Cell", parent=styles["Normal"], fontSize=8, textColor=colors.HexColor("#222222")) story = [] # Header story.append(Paragraph("🛒 Pricelyst Zimbabwe", title_style)) story.append(Paragraph(scope_label, sub_style)) story.append(Paragraph(f"Generated: {datetime.now().strftime('%d %b %Y, %H:%M')}", sub_style)) story.append(Spacer(1, 0.3*cm)) story.append(HRFlowable(width="100%", thickness=2, color=colors.HexColor("#1E7B34"))) story.append(Spacer(1, 0.4*cm)) story.append(Paragraph(title, styles["Heading2"])) story.append(Spacer(1, 0.3*cm)) # Table header col_widths = [5*cm, 3*cm, 2.5*cm, 2*cm, 2.2*cm, 2.8*cm] table_header = [ Paragraph("Product", head_style), Paragraph("Brand", head_style), Paragraph("Category", head_style), Paragraph("Store", head_style), Paragraph("Price (USD)", head_style), Paragraph("Best Price", head_style), ] table_data = [table_header] for item in items: offers = item.get("offers", []) best_p = item.get("best_price", 0) savings = item.get("potential_savings", 0) sub_note = " ⚠ (nearest match)" if item.get("is_substitute") else "" if offers: # First row: product + first offer first_offer = offers[0] savings_str = f"${savings:.2f} savings" if savings > 0.05 else "Best" table_data.append([ Paragraph(f"{item['product_name']}{sub_note}", cell_style), Paragraph(item.get("brand", ""), cell_style), Paragraph(item.get("category", ""), cell_style), Paragraph(first_offer["retailer"], cell_style), Paragraph(f"${first_offer['price']:.2f}", cell_style), Paragraph(f"${best_p:.2f}\n{savings_str}", cell_style), ]) # Additional offers as sub-rows for offer in offers[1:]: table_data.append([ Paragraph("", cell_style), Paragraph("", cell_style), Paragraph("", cell_style), Paragraph(offer["retailer"], cell_style), Paragraph(f"${offer['price']:.2f}", cell_style), Paragraph("", cell_style), ]) else: table_data.append([ Paragraph(item.get("query", "Unknown"), cell_style), Paragraph("", cell_style), Paragraph("", cell_style), Paragraph("Not listed", cell_style), Paragraph("N/A", cell_style), Paragraph("N/A", cell_style), ]) tbl = Table(table_data, colWidths=col_widths, repeatRows=1) tbl.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1E7B34")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), ("FONTSIZE", (0, 0), (-1, 0), 9), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F0FFF4")]), ("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#CCCCCC")), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 4), ("RIGHTPADDING", (0, 0), (-1, -1), 4), ("TOPPADDING", (0, 0), (-1, -1), 3), ("BOTTOMPADDING",(0, 0), (-1, -1), 3), ])) story.append(tbl) story.append(Spacer(1, 0.5*cm)) # Footer story.append(HRFlowable(width="100%", thickness=1, color=colors.HexColor("#CCCCCC"))) story.append(Spacer(1, 0.2*cm)) story.append(Paragraph( "Prices subject to change. Data sourced from Pricelyst.co.zw — Zimbabwe's #1 price comparison platform.", ParagraphStyle("Footer", parent=styles["Normal"], fontSize=7, textColor=colors.HexColor("#888888"), alignment=TA_CENTER) )) doc.build(story) logger.info(f"Catalogue PDF generated: {filepath}") return filepath except Exception as e: logger.error(f"PDF generation failed: {e}", exc_info=True) return None # ───────────────────────────────────────────── # 9. Firebase Profile Helpers # ───────────────────────────────────────────── def get_or_create_profile(mobile: str) -> Dict[str, Any]: if not db: return {} try: ref = db.collection("pricelyst_profiles").document(mobile) doc = ref.get() if doc.exists: return doc.to_dict() profile = {"mobile": mobile, "created_at": datetime.now(timezone.utc).isoformat()} ref.set(profile) return profile except Exception as e: logger.error(f"Profile fetch error for {mobile}: {e}") return {} def get_chat_history(mobile: str, limit: int = 6) -> str: if not db: return "" try: docs = ( db.collection("pricelyst_profiles").document(mobile) .collection("chat_logs") .order_by("ts", direction=firestore.Query.DESCENDING) .limit(limit) .stream() ) msgs = [] for d in docs: data = d.to_dict() msgs.append(f"User: {data.get('message', '')}\nApril: {data.get('response', '')}") return "\n".join(reversed(msgs)) except Exception as e: logger.error(f"Chat history error: {e}") return "" def save_chat_log(mobile: str, message: str, response: str, intent: Dict) -> None: if not db: return try: db.collection("pricelyst_profiles").document(mobile).collection("chat_logs").add({ "message": message, "response": response, "intent": intent, "ts": datetime.now(timezone.utc).isoformat() }) except Exception as e: logger.error(f"Chat log save error: {e}") def save_shopping_plan(mobile: str, plan: Dict) -> Optional[str]: if not db: return None try: ref = (db.collection("pricelyst_profiles").document(mobile) .collection("shopping_plans").document()) plan["id"] = ref.id ref.set(plan) return ref.id except Exception as e: logger.error(f"Plan save error: {e}") return None # ───────────────────────────────────────────── # 10. Firebase Storage Upload # ───────────────────────────────────────────── def upload_to_firebase_storage(file_path: str, folder: str = "catalogues") -> Optional[str]: """Upload file to Firebase Storage and return a signed 1-hour URL.""" if not FIREBASE_STORAGE_BUCKET: return None try: bucket = fb_storage.bucket() blob = bucket.blob(f"{folder}/{os.path.basename(file_path)}") blob.upload_from_filename(file_path) url = blob.generate_signed_url(expiration=timedelta(hours=1)) return url except Exception as e: logger.error(f"Firebase Storage upload failed: {e}") return None def upload_to_imgur(file_path: str) -> Optional[str]: """Upload image to Imgur and return public URL.""" if not IMGUR_CLIENT_ID: return None try: with open(file_path, "rb") as f: resp = requests.post(IMGUR_URL, headers=IMGUR_HEADERS, files={"image": f}) resp.raise_for_status() data = resp.json() return data["data"]["link"] if data.get("success") else None except Exception as e: logger.error(f"Imgur upload failed: {e}") return None # ───────────────────────────────────────────── # 11. TTS (DeepGram) # ───────────────────────────────────────────── def deepgram_tts(text: str) -> Optional[str]: """Convert text to MP3 via DeepGram TTS. Returns local file path.""" if not DEEPGRAM_API_KEY: return None try: resp = requests.post( DEEPGRAM_TTS_URL, headers={"Authorization": f"Token {DEEPGRAM_API_KEY}", "Content-Type": "application/json"}, json={"text": text}, timeout=30 ) resp.raise_for_status() fp = os.path.join(os.getcwd(), f"tts_{uuid.uuid4().hex}.mp3") with open(fp, "wb") as f: f.write(resp.content) return fp except Exception as e: logger.error(f"DeepGram TTS failed: {e}") return None # ───────────────────────────────────────────── # 12. Deals & Discovery Helpers # ───────────────────────────────────────────── def get_todays_deals(limit: int = 8) -> List[Dict]: """Return items with highest potential savings — proxy for 'deals'.""" df = get_market_index() if df.empty: return [] try: offers = df[df["is_offer"] == True].copy() if offers.empty: return [] # Compute savings per product price_range = offers.groupby("product_name")["price"].agg(["min", "max"]).reset_index() price_range["savings"] = price_range["max"] - price_range["min"] top = price_range[price_range["savings"] > 0.05].sort_values("savings", ascending=False).head(limit) deals = [] for _, row in top.iterrows(): cheapest = offers[offers["product_name"] == row["product_name"]].sort_values("price").iloc[0] deals.append({ "product_name": row["product_name"], "cheapest_price": float(cheapest["price"]), "retailer": cheapest["retailer"], "savings": float(row["savings"]), "category": cheapest.get("category", ""), }) return deals except Exception as e: logger.error(f"Deals fetch error: {e}") return [] def get_category_list() -> List[str]: """Return unique product categories for discovery.""" df = get_market_index() if df.empty: return [] try: return sorted(df["category"].dropna().unique().tolist()) except Exception: return [] def format_deals_message(deals: List[Dict]) -> str: """Format deals as a WhatsApp-ready text block.""" if not deals: return "No deals data available right now. Please try again shortly." lines = ["🏷️ *Today's Best Deals on Pricelyst* 🇿🇼\n"] for i, d in enumerate(deals, 1): lines.append( f"{i}. *{d['product_name']}*\n" f" 💰 ${d['cheapest_price']:.2f} @ {d['retailer']}\n" f" 🔥 Save up to ${d['savings']:.2f}\n" ) lines.append("\n_Reply with any product name to compare prices across stores!_") return "\n".join(lines) def format_basket_for_whatsapp(analyst: Dict, language: str = "English") -> str: """Format basket optimization result as clean WhatsApp text (used as fallback).""" if not analyst.get("actionable"): return analyst.get("error", "Sorry, I couldn't fetch price data right now.") lines = [] found = analyst.get("found_items", []) missing = analyst.get("global_missing", []) matrix = analyst.get("market_matrix", []) if found: lines.append("🛒 *Price Breakdown:*\n") for item in found: sub = " _(nearest match)_" if item.get("is_substitute") else "" lines.append(f"• *{item['product_name']}*{sub}") for o in item["offers"][:3]: marker = "✅" if o["price"] == item["best_price"] else " " lines.append(f" {marker} {o['retailer']}: *${o['price']:.2f}*") if item.get("potential_savings", 0) > 0.10: lines.append(f" 💡 Save up to ${item['potential_savings']:.2f}") lines.append("") if len(matrix) > 1: lines.append("🏪 *Store Totals:*\n") for s in matrix[:3]: cover = f"{s['found_count']}/{s['total_items']} items" lines.append(f"• *{s['retailer']}*: ${s['total_price']:.2f} ({cover})") best = matrix[0] savings = best.get("basket_savings", 0) if savings > 0.10: lines.append(f"\n✅ *Best: {best['retailer']}* — saves you *${savings:.2f}*!") if missing: lines.append(f"\n⚠️ Not found: {', '.join(missing)}") lines.append("_These may be available in-store — try a broader search term._") return "\n".join(lines)