Spaces:
Sleeping
Sleeping
| """ | |
| utility.py — Pricelyst WhatsApp Bot | |
| Core AI & Data layer: | |
| - ETL from Pricelyst API -> in-memory Pandas market index | |
| - Deep vector search & basket optimisation (Market Matrix) | |
| - Gemini 2.5 Flash (new google-genai SDK) for intent, vision, chat | |
| - Vernacular engine: Shona / Ndebele / English in -> native reply out | |
| - Catalogue PDF generation (reportlab) | |
| - Firebase persistence (profiles, chat history, shopping plans) | |
| - ZESA electricity unit calculator | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| import math | |
| import uuid | |
| import logging | |
| import base64 | |
| import io | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| # ───────────────────────────────────────────── | |
| # 1. Gemini (new google-genai SDK) | |
| # ───────────────────────────────────────────── | |
| try: | |
| from google import genai | |
| from google.genai import types as genai_types | |
| except ImportError: | |
| genai = None | |
| genai_types = None | |
| logger.error("google-genai not installed. Run: pip install google-genai") | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") | |
| _gemini_client = None | |
| if genai and GOOGLE_API_KEY: | |
| try: | |
| _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) | |
| except Exception as e: | |
| logger.error("Failed to init Gemini client: %s", e) | |
| # ───────────────────────────────────────────── | |
| # 2. Firebase | |
| # ───────────────────────────────────────────── | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore, storage as fb_storage | |
| FIREBASE_ENV = os.environ.get("FIREBASE", "") | |
| FIREBASE_STORAGE_BUCKET = os.environ.get("FIREBASE_STORAGE_BUCKET", "") | |
| db: Optional[Any] = None | |
| def init_firestore_from_env() -> Optional[Any]: | |
| global db | |
| try: | |
| if firebase_admin._apps: | |
| db = firestore.client() | |
| return db | |
| if not FIREBASE_ENV: | |
| logger.warning("FIREBASE env var missing. Persistence disabled.") | |
| return None | |
| sa_info = json.loads(FIREBASE_ENV) | |
| cred = credentials.Certificate(sa_info) | |
| init_opts = {} | |
| if FIREBASE_STORAGE_BUCKET: | |
| init_opts["storageBucket"] = FIREBASE_STORAGE_BUCKET | |
| firebase_admin.initialize_app(cred, init_opts) | |
| db = firestore.client() | |
| logger.info("Firebase initialized.") | |
| return db | |
| except Exception as e: | |
| logger.critical("Failed to initialize Firebase: %s", e) | |
| return None | |
| db = init_firestore_from_env() | |
| # ───────────────────────────────────────────── | |
| # 3. Static Config | |
| # ───────────────────────────────────────────── | |
| PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/") | |
| HTTP_TIMEOUT = 30 | |
| PRODUCT_CACHE_TTL = 60 * 20 # 20 minutes | |
| ZIM_CONTEXT = { | |
| "fuel_petrol": 1.58, | |
| "fuel_diesel": 1.65, | |
| "gas_lpg": 2.00, | |
| "bread_avg": 1.10, | |
| "zesa_step_1": {"limit": 50, "rate": 0.04}, | |
| "zesa_step_2": {"limit": 150, "rate": 0.09}, | |
| "zesa_step_3": {"limit": 9999, "rate": 0.14}, | |
| "zesa_levy": 0.06 | |
| } | |
| IMGUR_CLIENT_ID = os.environ.get("IMGUR_CLIENT_ID", "") | |
| IMGUR_URL = "https://api.imgur.com/3/image" | |
| IMGUR_HEADERS = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {} | |
| DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "") | |
| DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" | |
| # Catalogue output dir | |
| CATALOGUE_DIR = os.path.join(os.getcwd(), "catalogues") | |
| os.makedirs(CATALOGUE_DIR, exist_ok=True) | |
| # ───────────────────────────────────────────── | |
| # 4. Market Index (ETL) | |
| # ───────────────────────────────────────────── | |
| _data_cache: Dict[str, Any] = {"ts": 0, "df": pd.DataFrame(), "raw_count": 0} | |
| def _norm(s: Any) -> str: | |
| if not s: | |
| return "" | |
| return str(s).strip().lower() | |
| def _coerce_price(v: Any) -> float: | |
| try: | |
| return float(v) if v is not None else 0.0 | |
| except Exception: | |
| return 0.0 | |
| def fetch_and_flatten_data() -> pd.DataFrame: | |
| all_products = [] | |
| page = 1 | |
| logger.info("ETL: Starting fetch from /api/v1/product-listing") | |
| while True: | |
| try: | |
| url = f"{PRICE_API_BASE}/api/v1/product-listing" | |
| r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT) | |
| r.raise_for_status() | |
| payload = r.json() | |
| data = payload.get("data") or [] | |
| if not data: | |
| break | |
| all_products.extend(data) | |
| if page >= (payload.get("totalPages") or 99): | |
| break | |
| page += 1 | |
| except Exception as e: | |
| logger.error(f"ETL Error on page {page}: {e}") | |
| break | |
| rows = [] | |
| for p in all_products: | |
| try: | |
| p_id = int(p.get("id") or 0) | |
| p_name = str(p.get("name") or "Unknown") | |
| brand_obj = p.get("brand") or {} | |
| brand_name = str(brand_obj.get("brand_name") or "") | |
| cats = p.get("categories") or [] | |
| cat_names = [str(c.get("name") or "") for c in cats] | |
| cat_str = " ".join(cat_names) | |
| primary_cat = cat_names[0] if cat_names else "General" | |
| search_vector = _norm(f"{p_name} {brand_name} {cat_str}") | |
| views = int(p.get("view_count") or 0) | |
| image = str(p.get("thumbnail") or p.get("image") or "") | |
| prices = p.get("prices") or [] | |
| if not prices: | |
| rows.append({ | |
| "product_id": p_id, "product_name": p_name, | |
| "search_vector": search_vector, "brand": brand_name, | |
| "category": primary_cat, "retailer": "Listing", | |
| "price": 0.0, "views": views, "image": image, "is_offer": False | |
| }) | |
| continue | |
| for offer in prices: | |
| retailer = offer.get("retailer") or {} | |
| r_name = str(retailer.get("name") or "Unknown Store") | |
| price_val = _coerce_price(offer.get("price")) | |
| if price_val > 0: | |
| rows.append({ | |
| "product_id": p_id, "product_name": p_name, | |
| "search_vector": search_vector, "brand": brand_name, | |
| "category": primary_cat, "retailer": r_name, | |
| "price": price_val, "views": views, "image": image, "is_offer": True | |
| }) | |
| except Exception: | |
| continue | |
| df = pd.DataFrame(rows) | |
| logger.info(f"ETL: Flattened into {len(df)} rows.") | |
| return df | |
| def get_market_index(force_refresh: bool = False) -> pd.DataFrame: | |
| global _data_cache | |
| if (force_refresh or _data_cache["df"].empty | |
| or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL)): | |
| logger.info("ETL: Refreshing Market Index...") | |
| df = fetch_and_flatten_data() | |
| _data_cache.update({"df": df, "ts": time.time(), "raw_count": len(df)}) | |
| return _data_cache["df"] | |
| # ───────────────────────────────────────────── | |
| # 5. Precision Search & Basket Optimisation | |
| # ───────────────────────────────────────────── | |
| def search_products_deep(df: pd.DataFrame, query: str, limit: int = 15) -> pd.DataFrame: | |
| """ | |
| Precision Search Algorithm: | |
| 1. Exact Name Match (1000 pts) | |
| 2. Sequential vector match (500 pts) | |
| 3. Brand match (200 pts) | |
| 4. Token overlap (50 pts each) | |
| Tie-break: views desc, price asc | |
| """ | |
| if df.empty or not query: | |
| return df | |
| q_norm = _norm(query) | |
| q_tokens = set(q_norm.split()) | |
| def score(row): | |
| s = 0 | |
| vector = row["search_vector"] | |
| if q_norm == _norm(row["product_name"]): s += 1000 | |
| if q_norm in vector: s += 500 | |
| if row["brand"].lower() in q_norm: s += 200 | |
| overlap = len(q_tokens.intersection(set(vector.split()))) | |
| s += overlap * 50 | |
| return s | |
| df_scored = df.copy() | |
| df_scored["match_score"] = df_scored.apply(score, axis=1) | |
| matches = df_scored[df_scored["match_score"] > 0] | |
| if matches.empty: | |
| return matches | |
| return matches.sort_values( | |
| ["match_score", "views", "price"], ascending=[False, False, True] | |
| ).head(limit) | |
| def calculate_basket_optimization(item_names: List[str], | |
| preferred_retailer: Optional[str] = None) -> Dict[str, Any]: | |
| """Full market matrix with precision search, savings calculation & substitute flagging.""" | |
| df = get_market_index() | |
| if df.empty: | |
| return {"actionable": False, "error": "Market data unavailable. Please try again shortly."} | |
| found_items = [] | |
| missing_global = [] | |
| for item in item_names: | |
| hits = search_products_deep(df[df["is_offer"] == True], item, limit=10) | |
| if hits.empty: | |
| missing_global.append(item) | |
| continue | |
| best_match = hits.iloc[0] | |
| q_norm = _norm(item) | |
| res_norm = _norm(f"{best_match['product_name']} {best_match['brand']}") | |
| q_tokens = q_norm.split() | |
| is_sub = len(q_tokens) > 1 and sum(1 for t in q_tokens if t in res_norm) < len(q_tokens) | |
| product_offers = ( | |
| hits[hits["product_name"] == best_match["product_name"]] | |
| .sort_values("price") | |
| ) | |
| offers_list = [{"retailer": r["retailer"], "price": float(r["price"])} | |
| for _, r in product_offers.iterrows()] | |
| best_price = offers_list[0]["price"] | |
| max_price = offers_list[-1]["price"] | |
| found_items.append({ | |
| "query": item, | |
| "product_name": str(best_match["product_name"]), | |
| "brand": str(best_match["brand"]), | |
| "category": str(best_match["category"]), | |
| "image": str(best_match["image"]), | |
| "is_substitute": is_sub, | |
| "offers": offers_list, | |
| "best_price": best_price, | |
| "potential_savings": max_price - best_price, | |
| }) | |
| if not found_items: | |
| return {"actionable": True, "found_items": [], "global_missing": missing_global} | |
| # Market Matrix | |
| all_retailers = set() | |
| for f in found_items: | |
| for o in f["offers"]: | |
| all_retailers.add(o["retailer"]) | |
| store_comparison = [] | |
| for retailer in all_retailers: | |
| total_price = 0.0 | |
| found_count = 0 | |
| missing_list = [] | |
| for item in found_items: | |
| price = next((o["price"] for o in item["offers"] if o["retailer"] == retailer), None) | |
| if price: | |
| total_price += price | |
| found_count += 1 | |
| else: | |
| missing_list.append(item["product_name"]) | |
| store_comparison.append({ | |
| "retailer": retailer, | |
| "total_price": total_price, | |
| "found_count": found_count, | |
| "total_items": len(found_items), | |
| "missing_items": missing_list, | |
| }) | |
| store_comparison.sort(key=lambda x: (-x["found_count"], x["total_price"])) | |
| if len(store_comparison) > 1: | |
| max_total = max( | |
| s["total_price"] for s in store_comparison | |
| if s["found_count"] == store_comparison[0]["found_count"] | |
| ) | |
| for s in store_comparison: | |
| s["basket_savings"] = ( | |
| max_total - s["total_price"] | |
| if s["found_count"] == store_comparison[0]["found_count"] else 0.0 | |
| ) | |
| else: | |
| for s in store_comparison: | |
| s["basket_savings"] = 0.0 | |
| return { | |
| "actionable": True, | |
| "is_basket": len(found_items) > 1, | |
| "found_items": found_items, | |
| "global_missing": missing_global, | |
| "market_matrix": store_comparison[:5], | |
| "best_store": store_comparison[0] if store_comparison else None, | |
| "preferred_retailer": preferred_retailer, | |
| } | |
| # ───────────────────────────────────────────── | |
| # 6. ZESA Calculator | |
| # ───────────────────────────────────────────── | |
| def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]: | |
| remaining = amount_usd / (1 + ZIM_CONTEXT["zesa_levy"]) | |
| units = 0.0 | |
| t1 = ZIM_CONTEXT["zesa_step_1"] | |
| cost_t1 = t1["limit"] * t1["rate"] | |
| if remaining > cost_t1: | |
| units += t1["limit"] | |
| remaining -= cost_t1 | |
| t2 = ZIM_CONTEXT["zesa_step_2"] | |
| cost_t2 = t2["limit"] * t2["rate"] | |
| if remaining > cost_t2: | |
| units += t2["limit"] | |
| remaining -= cost_t2 | |
| units += remaining / ZIM_CONTEXT["zesa_step_3"]["rate"] | |
| else: | |
| units += remaining / t2["rate"] | |
| else: | |
| units += remaining / t1["rate"] | |
| return {"amount_usd": float(amount_usd), "est_units_kwh": float(round(units, 1))} | |
| # ───────────────────────────────────────────── | |
| # 7. Gemini Helpers (new SDK) | |
| # ───────────────────────────────────────────── | |
| def _safe_json_loads(s: str, fallback: Any) -> Any: | |
| try: | |
| cleaned = s | |
| if "```json" in cleaned: | |
| cleaned = cleaned.split("```json")[1].split("```")[0] | |
| elif "```" in cleaned: | |
| cleaned = cleaned.split("```")[1] | |
| return json.loads(cleaned.strip()) | |
| except Exception as e: | |
| logger.error(f"JSON parse error: {e} | raw: {s[:300]}") | |
| return fallback | |
| def gemini_detect_intent(transcript: str) -> Dict[str, Any]: | |
| """Classify intent + extract items/amounts from user message, supporting Shona/Ndebele/English.""" | |
| if not _gemini_client: | |
| return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} | |
| PROMPT = """ | |
| Analyze the transcript below and return STRICT JSON. | |
| Intents: | |
| - CASUAL_CHAT : Greetings, "hi", off-topic | |
| - SHOPPING_BASKET : Searching for prices / cheapest X | |
| - UTILITY_CALC : Electricity / ZESA / fuel cost questions | |
| - STORE_DECISION : "Which store is cheapest?", "Where should I shop?" | |
| - EVENT_PLANNING : Implicit lists — "plan a braai", "wedding grocery list", "dinner for 5" | |
| - CATALOGUE_REQUEST : User wants a PDF price list / catalogue / deals sheet | |
| - DEALS_EXPLORE : "Today's deals", "promotions", "what's on special" | |
| - DISCOVER : "What products do you have?", "show me your categories" | |
| Rules: | |
| - Extract items: translate ALL items to English (e.g. 'hupfu' → 'maize meal', 'mafuta' → 'cooking oil'). | |
| - If only a concept is given (e.g. "plan a braai"), set is_event_planning=true and items=[]. | |
| - Detect user language accurately (e.g. Shona, Ndebele, English). | |
| - store_preference: name of store if explicitly mentioned. | |
| - utility_amount: numeric value if mentioned. | |
| JSON Schema: | |
| { | |
| "actionable": boolean, | |
| "intent": "string", | |
| "items": ["string"], | |
| "utility_amount": number, | |
| "store_preference": "string | null", | |
| "is_event_planning": boolean, | |
| "language": "string", | |
| "catalogue_scope": "string | null" | |
| } | |
| Transcript: """ + transcript | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT, | |
| config=genai_types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, | |
| {"actionable": False, "intent": "CASUAL_CHAT", | |
| "language": "English", "items": []}) | |
| except Exception as e: | |
| logger.error(f"Intent detect error: {e}") | |
| return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} | |
| def gemini_explode_concept(transcript: str) -> List[str]: | |
| """Converts an event/meal concept into a concrete grocery list.""" | |
| if not _gemini_client: | |
| return [] | |
| PROMPT = f""" | |
| User wants to plan: "{transcript}" | |
| Generate 10-15 essential Zimbabwean grocery items for this. | |
| Use English terms suitable for database lookup (e.g. 'Maize Meal', 'Cooking Oil', 'Beef'). | |
| Return ONLY a JSON list of strings. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT, | |
| config=genai_types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, []) | |
| except Exception as e: | |
| logger.error(f"Concept explode error: {e}") | |
| return [] | |
| def gemini_analyze_image(image_bytes: bytes, caption: str = "") -> Dict[str, Any]: | |
| """Analyse a WhatsApp image — grocery list photo, product, or meal.""" | |
| if not _gemini_client: | |
| return {"type": "IRRELEVANT", "items": [], "description": ""} | |
| PROMPT = f""" | |
| Analyze this image. Context caption: "{caption}" | |
| Classify: | |
| 1. SHOPPING_LIST → Extract each item as written (translate to English). | |
| 2. SINGLE_PRODUCT → Extract BRAND + NAME (e.g. "Pepsi 500ml", "Zimgold Cooking Oil 2L"). | |
| 3. MEAL_DISH → Identify dish name + core ingredients. | |
| 4. IRRELEVANT → Not shopping related. | |
| Return STRICT JSON: | |
| {{ | |
| "type": "SHOPPING_LIST" | "SINGLE_PRODUCT" | "MEAL_DISH" | "IRRELEVANT", | |
| "items": ["item1", "item2"], | |
| "description": "short description" | |
| }} | |
| """ | |
| try: | |
| image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=[PROMPT, image_part], | |
| config=genai_types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| result = _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": [], "description": ""}) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Vision error: {e}") | |
| return {"type": "IRRELEVANT", "items": [], "description": ""} | |
| def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, | |
| chat_history: str = "", language: str = "English") -> str: | |
| """Generate April's WhatsApp reply, formatted for plain text (no markdown headers).""" | |
| if not _gemini_client: | |
| return "Hi! I'm April from Pricelyst. I'm having a bit of trouble right now — please try again shortly." | |
| context_str = "" | |
| if chat_history: | |
| context_str += f"RECENT CHAT:\n{chat_history}\n\n" | |
| context_str += ( | |
| f"ZIMBABWE CONTEXT: Petrol=${ZIM_CONTEXT['fuel_petrol']}/L, " | |
| f"Diesel=${ZIM_CONTEXT['fuel_diesel']}/L, " | |
| f"Bread≈${ZIM_CONTEXT['bread_avg']}, " | |
| f"ZESA: $10={calculate_zesa_units(10)['est_units_kwh']}u, " | |
| f"$20={calculate_zesa_units(20)['est_units_kwh']}u\n" | |
| ) | |
| if analyst_data: | |
| context_str += f"\nANALYST DATA:\n{json.dumps(analyst_data, default=str)}\n" | |
| PROMPT = f""" | |
| You are April, Pricelyst Zimbabwe's friendly WhatsApp Shopping Advisor 🛒. | |
| Your mission: shortest path to value + complete price transparency for Zimbabwean shoppers. | |
| INPUT: "{transcript}" | |
| USER LANGUAGE: {language} | |
| INTENT: {intent.get('intent', 'CASUAL_CHAT')} | |
| CONTEXT: | |
| {context_str} | |
| FORMATTING RULES (WhatsApp plain text — NO markdown headers like ##): | |
| - Use *bold* for store names and prices. | |
| - Use emojis naturally (✅ 🛒 💰 📍 ⚠️). | |
| - Keep replies concise. No walls of text. | |
| - If replying in Shona or Ndebele, ensure grammar is natural. | |
| LOGIC: | |
| 1. BASKET COMPARISON: Present market matrix. State cheapest store total and basket_savings clearly. | |
| Example: "✅ *OK Mart* has the best deal at *$4.00* total — saving you *$2.95* vs Spar!" | |
| 2. SUBSTITUTE (is_substitute=true): "I couldn't find *[Query]*, but the nearest match is *[Product]* at *$X*." | |
| 3. SINGLE ITEM: Show cheapest price + store, then alternatives. Note potential_savings if > $0.10. | |
| 4. ZESA: Calculate and explain units clearly. | |
| 5. CASUAL / GREETING: Be warm, introduce yourself briefly and invite a search query. | |
| 6. DEALS_EXPLORE: List 5-8 interesting current deals from analyst data. | |
| 7. EVENT_PLANNING: Acknowledge the plan, then present the shopping basket. | |
| 8. CATALOGUE_REQUEST: Inform the user their PDF catalogue is being prepared and will arrive shortly. | |
| Always end with a helpful follow-up question or CTA if appropriate. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT | |
| ) | |
| return resp.text.strip() | |
| except Exception as e: | |
| logger.error(f"Chat response error: {e}") | |
| return "I checked the prices, but I'm having trouble displaying them right now. Please try again!" | |
| def gemini_translate(text: str, target_lang: str) -> str: | |
| """Translate April's English response into the user's language if needed.""" | |
| if not _gemini_client or not target_lang or target_lang.lower() == "english": | |
| return text | |
| PROMPT = f""" | |
| Translate the following WhatsApp shopping assistant reply from English to {target_lang}. | |
| Rules: | |
| - Keep prices ($X.XX), store names, and product names UNCHANGED. | |
| - Keep WhatsApp formatting (*bold*, emojis) UNCHANGED. | |
| - Use natural, conversational tone. | |
| Text: | |
| "{text}" | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) | |
| return resp.text.strip() | |
| except Exception as e: | |
| logger.error(f"Translation error: {e}") | |
| return text | |
| # ───────────────────────────────────────────── | |
| # 8. Catalogue PDF Generator | |
| # ───────────────────────────────────────────── | |
| def generate_catalogue_pdf(title: str, items: List[Dict], scope_label: str = "Price Comparison") -> Optional[str]: | |
| """ | |
| Generate a professional Pricelyst price-comparison PDF using reportlab. | |
| Returns file path on success, None on failure. | |
| """ | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib import colors | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import cm | |
| from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, | |
| Table, TableStyle, HRFlowable) | |
| from reportlab.lib.enums import TA_CENTER, TA_LEFT | |
| filename = f"pricelyst_catalogue_{uuid.uuid4().hex[:8]}.pdf" | |
| filepath = os.path.join(CATALOGUE_DIR, filename) | |
| doc = SimpleDocTemplate(filepath, pagesize=A4, | |
| rightMargin=1.5*cm, leftMargin=1.5*cm, | |
| topMargin=1.5*cm, bottomMargin=1.5*cm) | |
| styles = getSampleStyleSheet() | |
| # Custom styles | |
| title_style = ParagraphStyle("CatTitle", parent=styles["Title"], | |
| fontSize=22, textColor=colors.HexColor("#1E7B34"), | |
| spaceAfter=4, alignment=TA_CENTER) | |
| sub_style = ParagraphStyle("CatSub", parent=styles["Normal"], | |
| fontSize=10, textColor=colors.HexColor("#555555"), | |
| alignment=TA_CENTER, spaceAfter=2) | |
| head_style = ParagraphStyle("ColHead", parent=styles["Normal"], | |
| fontSize=9, textColor=colors.white, | |
| fontName="Helvetica-Bold") | |
| cell_style = ParagraphStyle("Cell", parent=styles["Normal"], | |
| fontSize=8, textColor=colors.HexColor("#222222")) | |
| story = [] | |
| # Header | |
| story.append(Paragraph("🛒 Pricelyst Zimbabwe", title_style)) | |
| story.append(Paragraph(scope_label, sub_style)) | |
| story.append(Paragraph(f"Generated: {datetime.now().strftime('%d %b %Y, %H:%M')}", sub_style)) | |
| story.append(Spacer(1, 0.3*cm)) | |
| story.append(HRFlowable(width="100%", thickness=2, color=colors.HexColor("#1E7B34"))) | |
| story.append(Spacer(1, 0.4*cm)) | |
| story.append(Paragraph(title, styles["Heading2"])) | |
| story.append(Spacer(1, 0.3*cm)) | |
| # Table header | |
| col_widths = [5*cm, 3*cm, 2.5*cm, 2*cm, 2.2*cm, 2.8*cm] | |
| table_header = [ | |
| Paragraph("Product", head_style), | |
| Paragraph("Brand", head_style), | |
| Paragraph("Category", head_style), | |
| Paragraph("Store", head_style), | |
| Paragraph("Price (USD)", head_style), | |
| Paragraph("Best Price", head_style), | |
| ] | |
| table_data = [table_header] | |
| for item in items: | |
| offers = item.get("offers", []) | |
| best_p = item.get("best_price", 0) | |
| savings = item.get("potential_savings", 0) | |
| sub_note = " ⚠ (nearest match)" if item.get("is_substitute") else "" | |
| if offers: | |
| # First row: product + first offer | |
| first_offer = offers[0] | |
| savings_str = f"${savings:.2f} savings" if savings > 0.05 else "Best" | |
| table_data.append([ | |
| Paragraph(f"{item['product_name']}{sub_note}", cell_style), | |
| Paragraph(item.get("brand", ""), cell_style), | |
| Paragraph(item.get("category", ""), cell_style), | |
| Paragraph(first_offer["retailer"], cell_style), | |
| Paragraph(f"${first_offer['price']:.2f}", cell_style), | |
| Paragraph(f"${best_p:.2f}\n{savings_str}", cell_style), | |
| ]) | |
| # Additional offers as sub-rows | |
| for offer in offers[1:]: | |
| table_data.append([ | |
| Paragraph("", cell_style), | |
| Paragraph("", cell_style), | |
| Paragraph("", cell_style), | |
| Paragraph(offer["retailer"], cell_style), | |
| Paragraph(f"${offer['price']:.2f}", cell_style), | |
| Paragraph("", cell_style), | |
| ]) | |
| else: | |
| table_data.append([ | |
| Paragraph(item.get("query", "Unknown"), cell_style), | |
| Paragraph("", cell_style), | |
| Paragraph("", cell_style), | |
| Paragraph("Not listed", cell_style), | |
| Paragraph("N/A", cell_style), | |
| Paragraph("N/A", cell_style), | |
| ]) | |
| tbl = Table(table_data, colWidths=col_widths, repeatRows=1) | |
| tbl.setStyle(TableStyle([ | |
| ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1E7B34")), | |
| ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), | |
| ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), | |
| ("FONTSIZE", (0, 0), (-1, 0), 9), | |
| ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F0FFF4")]), | |
| ("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#CCCCCC")), | |
| ("VALIGN", (0, 0), (-1, -1), "TOP"), | |
| ("LEFTPADDING", (0, 0), (-1, -1), 4), | |
| ("RIGHTPADDING", (0, 0), (-1, -1), 4), | |
| ("TOPPADDING", (0, 0), (-1, -1), 3), | |
| ("BOTTOMPADDING",(0, 0), (-1, -1), 3), | |
| ])) | |
| story.append(tbl) | |
| story.append(Spacer(1, 0.5*cm)) | |
| # Footer | |
| story.append(HRFlowable(width="100%", thickness=1, color=colors.HexColor("#CCCCCC"))) | |
| story.append(Spacer(1, 0.2*cm)) | |
| story.append(Paragraph( | |
| "Prices subject to change. Data sourced from Pricelyst.co.zw — Zimbabwe's #1 price comparison platform.", | |
| ParagraphStyle("Footer", parent=styles["Normal"], fontSize=7, | |
| textColor=colors.HexColor("#888888"), alignment=TA_CENTER) | |
| )) | |
| doc.build(story) | |
| logger.info(f"Catalogue PDF generated: {filepath}") | |
| return filepath | |
| except Exception as e: | |
| logger.error(f"PDF generation failed: {e}", exc_info=True) | |
| return None | |
| # ───────────────────────────────────────────── | |
| # 9. Firebase Profile Helpers | |
| # ───────────────────────────────────────────── | |
| def get_or_create_profile(mobile: str) -> Dict[str, Any]: | |
| if not db: | |
| return {} | |
| try: | |
| ref = db.collection("pricelyst_profiles").document(mobile) | |
| doc = ref.get() | |
| if doc.exists: | |
| return doc.to_dict() | |
| profile = {"mobile": mobile, "created_at": datetime.now(timezone.utc).isoformat()} | |
| ref.set(profile) | |
| return profile | |
| except Exception as e: | |
| logger.error(f"Profile fetch error for {mobile}: {e}") | |
| return {} | |
| def get_chat_history(mobile: str, limit: int = 6) -> str: | |
| if not db: | |
| return "" | |
| try: | |
| docs = ( | |
| db.collection("pricelyst_profiles").document(mobile) | |
| .collection("chat_logs") | |
| .order_by("ts", direction=firestore.Query.DESCENDING) | |
| .limit(limit) | |
| .stream() | |
| ) | |
| msgs = [] | |
| for d in docs: | |
| data = d.to_dict() | |
| msgs.append(f"User: {data.get('message', '')}\nApril: {data.get('response', '')}") | |
| return "\n".join(reversed(msgs)) | |
| except Exception as e: | |
| logger.error(f"Chat history error: {e}") | |
| return "" | |
| def save_chat_log(mobile: str, message: str, response: str, intent: Dict) -> None: | |
| if not db: | |
| return | |
| try: | |
| db.collection("pricelyst_profiles").document(mobile).collection("chat_logs").add({ | |
| "message": message, | |
| "response": response, | |
| "intent": intent, | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Chat log save error: {e}") | |
| def save_shopping_plan(mobile: str, plan: Dict) -> Optional[str]: | |
| if not db: | |
| return None | |
| try: | |
| ref = (db.collection("pricelyst_profiles").document(mobile) | |
| .collection("shopping_plans").document()) | |
| plan["id"] = ref.id | |
| ref.set(plan) | |
| return ref.id | |
| except Exception as e: | |
| logger.error(f"Plan save error: {e}") | |
| return None | |
| # ───────────────────────────────────────────── | |
| # 10. Firebase Storage Upload | |
| # ───────────────────────────────────────────── | |
| def upload_to_firebase_storage(file_path: str, folder: str = "catalogues") -> Optional[str]: | |
| """Upload file to Firebase Storage and return a signed 1-hour URL.""" | |
| if not FIREBASE_STORAGE_BUCKET: | |
| return None | |
| try: | |
| bucket = fb_storage.bucket() | |
| blob = bucket.blob(f"{folder}/{os.path.basename(file_path)}") | |
| blob.upload_from_filename(file_path) | |
| url = blob.generate_signed_url(expiration=timedelta(hours=1)) | |
| return url | |
| except Exception as e: | |
| logger.error(f"Firebase Storage upload failed: {e}") | |
| return None | |
| def upload_to_imgur(file_path: str) -> Optional[str]: | |
| """Upload image to Imgur and return public URL.""" | |
| if not IMGUR_CLIENT_ID: | |
| return None | |
| try: | |
| with open(file_path, "rb") as f: | |
| resp = requests.post(IMGUR_URL, headers=IMGUR_HEADERS, files={"image": f}) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| return data["data"]["link"] if data.get("success") else None | |
| except Exception as e: | |
| logger.error(f"Imgur upload failed: {e}") | |
| return None | |
| # ───────────────────────────────────────────── | |
| # 11. TTS (DeepGram) | |
| # ───────────────────────────────────────────── | |
| def deepgram_tts(text: str) -> Optional[str]: | |
| """Convert text to MP3 via DeepGram TTS. Returns local file path.""" | |
| if not DEEPGRAM_API_KEY: | |
| return None | |
| try: | |
| resp = requests.post( | |
| DEEPGRAM_TTS_URL, | |
| headers={"Authorization": f"Token {DEEPGRAM_API_KEY}", "Content-Type": "application/json"}, | |
| json={"text": text}, | |
| timeout=30 | |
| ) | |
| resp.raise_for_status() | |
| fp = os.path.join(os.getcwd(), f"tts_{uuid.uuid4().hex}.mp3") | |
| with open(fp, "wb") as f: | |
| f.write(resp.content) | |
| return fp | |
| except Exception as e: | |
| logger.error(f"DeepGram TTS failed: {e}") | |
| return None | |
| # ───────────────────────────────────────────── | |
| # 12. Deals & Discovery Helpers | |
| # ───────────────────────────────────────────── | |
| def get_todays_deals(limit: int = 8) -> List[Dict]: | |
| """Return items with highest potential savings — proxy for 'deals'.""" | |
| df = get_market_index() | |
| if df.empty: | |
| return [] | |
| try: | |
| offers = df[df["is_offer"] == True].copy() | |
| if offers.empty: | |
| return [] | |
| # Compute savings per product | |
| price_range = offers.groupby("product_name")["price"].agg(["min", "max"]).reset_index() | |
| price_range["savings"] = price_range["max"] - price_range["min"] | |
| top = price_range[price_range["savings"] > 0.05].sort_values("savings", ascending=False).head(limit) | |
| deals = [] | |
| for _, row in top.iterrows(): | |
| cheapest = offers[offers["product_name"] == row["product_name"]].sort_values("price").iloc[0] | |
| deals.append({ | |
| "product_name": row["product_name"], | |
| "cheapest_price": float(cheapest["price"]), | |
| "retailer": cheapest["retailer"], | |
| "savings": float(row["savings"]), | |
| "category": cheapest.get("category", ""), | |
| }) | |
| return deals | |
| except Exception as e: | |
| logger.error(f"Deals fetch error: {e}") | |
| return [] | |
| def get_category_list() -> List[str]: | |
| """Return unique product categories for discovery.""" | |
| df = get_market_index() | |
| if df.empty: | |
| return [] | |
| try: | |
| return sorted(df["category"].dropna().unique().tolist()) | |
| except Exception: | |
| return [] | |
| def format_deals_message(deals: List[Dict]) -> str: | |
| """Format deals as a WhatsApp-ready text block.""" | |
| if not deals: | |
| return "No deals data available right now. Please try again shortly." | |
| lines = ["🏷️ *Today's Best Deals on Pricelyst* 🇿🇼\n"] | |
| for i, d in enumerate(deals, 1): | |
| lines.append( | |
| f"{i}. *{d['product_name']}*\n" | |
| f" 💰 ${d['cheapest_price']:.2f} @ {d['retailer']}\n" | |
| f" 🔥 Save up to ${d['savings']:.2f}\n" | |
| ) | |
| lines.append("\n_Reply with any product name to compare prices across stores!_") | |
| return "\n".join(lines) | |
| def format_basket_for_whatsapp(analyst: Dict, language: str = "English") -> str: | |
| """Format basket optimization result as clean WhatsApp text (used as fallback).""" | |
| if not analyst.get("actionable"): | |
| return analyst.get("error", "Sorry, I couldn't fetch price data right now.") | |
| lines = [] | |
| found = analyst.get("found_items", []) | |
| missing = analyst.get("global_missing", []) | |
| matrix = analyst.get("market_matrix", []) | |
| if found: | |
| lines.append("🛒 *Price Breakdown:*\n") | |
| for item in found: | |
| sub = " _(nearest match)_" if item.get("is_substitute") else "" | |
| lines.append(f"• *{item['product_name']}*{sub}") | |
| for o in item["offers"][:3]: | |
| marker = "✅" if o["price"] == item["best_price"] else " " | |
| lines.append(f" {marker} {o['retailer']}: *${o['price']:.2f}*") | |
| if item.get("potential_savings", 0) > 0.10: | |
| lines.append(f" 💡 Save up to ${item['potential_savings']:.2f}") | |
| lines.append("") | |
| if len(matrix) > 1: | |
| lines.append("🏪 *Store Totals:*\n") | |
| for s in matrix[:3]: | |
| cover = f"{s['found_count']}/{s['total_items']} items" | |
| lines.append(f"• *{s['retailer']}*: ${s['total_price']:.2f} ({cover})") | |
| best = matrix[0] | |
| savings = best.get("basket_savings", 0) | |
| if savings > 0.10: | |
| lines.append(f"\n✅ *Best: {best['retailer']}* — saves you *${savings:.2f}*!") | |
| if missing: | |
| lines.append(f"\n⚠️ Not found: {', '.join(missing)}") | |
| lines.append("_These may be available in-store — try a broader search term._") | |
| return "\n".join(lines) |