""" utility.py — Pricelyst WhatsApp Bot (v2) Core AI & Data layer. Upgrades in this version: 1. Smart message formatting — per-store breakdown for ≤3 items, cheapest-basket summary with highlights for larger baskets. 2. Budget Engine — real $ calculations against live catalogue prices. Handles: fixed budget shopping, monthly household budgeting, party/event planning with headcount, meal prep with recipes, image-to-recipe-to-shopping-list pipeline. """ import os import re import json import time import math import uuid import logging import base64 import io from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Optional, Tuple import requests import pandas as pd logger = logging.getLogger(__name__) # ───────────────────────────────────────────── # 1. Gemini (new google-genai SDK) # ───────────────────────────────────────────── try: from google import genai from google.genai import types as genai_types except ImportError: genai = None genai_types = None logger.error("google-genai not installed. Run: pip install google-genai") GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite") _gemini_client = None if genai and GOOGLE_API_KEY: try: _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) except Exception as e: logger.error("Failed to init Gemini client: %s", e) # ───────────────────────────────────────────── # 2. Firebase # ───────────────────────────────────────────── import firebase_admin from firebase_admin import credentials, firestore, storage as fb_storage FIREBASE_ENV = os.environ.get("FIREBASE", "") FIREBASE_STORAGE_BUCKET = os.environ.get("FIREBASE_STORAGE_BUCKET", "") db: Optional[Any] = None def _get_firestore_client() -> Optional[Any]: db_name = os.environ.get("FIRESTORE_DB_NAME", "(default)") return firestore.client(database_id=db_name) def init_firestore_from_env() -> Optional[Any]: global db try: if firebase_admin._apps: db = _get_firestore_client() return db if not FIREBASE_ENV: logger.warning("FIREBASE env var missing. Persistence disabled.") return None sa_info = json.loads(FIREBASE_ENV) cred = credentials.Certificate(sa_info) init_opts = {} if FIREBASE_STORAGE_BUCKET: init_opts["storageBucket"] = FIREBASE_STORAGE_BUCKET firebase_admin.initialize_app(cred, init_opts) db = _get_firestore_client() logger.info("Firebase initialized.") return db except Exception as e: logger.critical("Failed to initialize Firebase: %s", e, exc_info=True) return None db = init_firestore_from_env() # ───────────────────────────────────────────── # 3. Static Config # ───────────────────────────────────────────── PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/") HTTP_TIMEOUT = 30 PRODUCT_CACHE_TTL = 60 * 20 # 20 minutes # ZESA/ZETDC tariffs — ZERA approved, USD equivalent incl. 6% REA levy # Source: zimpricecheck.com, last updated 20 May 2026 # Billing is in ZiG; USD equivalents at prevailing interbank rate. # Domestic users billed in ZiG — USD estimates for user convenience. ZIM_CONTEXT = { "zesa_bands": [ {"limit": 50, "rate_usd": 0.08}, # first 50 units {"limit": 100, "rate_usd": 0.09}, # 51-100 {"limit": 200, "rate_usd": 0.16}, # 101-200 {"limit": 300, "rate_usd": 0.23}, # 201-300 {"limit": 400, "rate_usd": 0.25}, # 301-400 {"limit": 9999, "rate_usd": 0.26}, # 401+ ], "zesa_note": "Tariffs billed in ZiG; USD estimates at prevailing rate. Includes 6% REA levy.", } IMGUR_CLIENT_ID = os.environ.get("IMGUR_CLIENT_ID", "") IMGUR_URL = "https://api.imgur.com/3/image" IMGUR_HEADERS = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {} DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "") DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" CATALOGUE_DIR = os.path.join(os.getcwd(), "catalogues") os.makedirs(CATALOGUE_DIR, exist_ok=True) # ── Logo cache ───────────────────────────────────────────────────────────── # Downloaded once at startup and reused for all PDFs. # Uses requests (not urllib) so it goes through the same session that works # on HuggingFace — urllib hits the same egress block as Meta/Cloudflare. LOGO_URL = os.environ.get("PRICELYST_LOGO_URL", "https://i.imgur.com/4bVNlBs.jpeg") _logo_path: Optional[str] = None # path to cached local file def _get_logo_path() -> Optional[str]: """ Return a ReportLab-safe cached logo path. Diagnosis/fix: - Imgur can sometimes return a redirect/html error page or a JPEG variant that ReportLab fails to decode cleanly. - We now download with a browser-like User-Agent, verify the content is an image, and convert it to PNG using Pillow when available. PNG is the safest format for ReportLab. - If Pillow is not installed, we still cache the original image bytes and ReportLab will try to render them. """ global _logo_path if _logo_path and os.path.exists(_logo_path): return _logo_path try: headers = { "User-Agent": "PricelystBot/1.0 (+https://pricelyst.co.zw)", "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", } resp = requests.get(LOGO_URL, timeout=20, headers=headers, allow_redirects=True) resp.raise_for_status() content_type = (resp.headers.get("Content-Type") or "").lower() if "image" not in content_type and not resp.content[:16].startswith((b"\xff\xd8", b"\x89PNG", b"GIF", b"RIFF")): logger.warning( "Logo download did not return an image. content_type=%s bytes=%s", content_type, len(resp.content) ) return None png_file = os.path.join(CATALOGUE_DIR, "pricelyst_logo.png") try: from PIL import Image img = Image.open(io.BytesIO(resp.content)) if img.mode not in ("RGB", "RGBA"): img = img.convert("RGBA") img.save(png_file, format="PNG", optimize=True) _logo_path = png_file logger.info("Logo cached as PNG at %s (%s bytes)", png_file, os.path.getsize(png_file)) return _logo_path except Exception as pil_err: # Fallback for environments without Pillow. ReportLab may still render it. ext = ".png" if "png" in content_type else ".jpg" raw_file = os.path.join(CATALOGUE_DIR, f"pricelyst_logo_raw{ext}") with open(raw_file, "wb") as f: f.write(resp.content) _logo_path = raw_file logger.warning("Pillow logo conversion failed (%s). Cached raw logo at %s", pil_err, raw_file) return _logo_path except Exception as e: logger.warning("Logo download failed from %s: %s", LOGO_URL, e) return None # ───────────────────────────────────────────── # 4. Market Index # ───────────────────────────────────────────── # 4. Market Index (ETL) # ───────────────────────────────────────────── _data_cache: Dict[str, Any] = {"ts": 0, "df": pd.DataFrame(), "raw_count": 0} def _norm(s: Any) -> str: return str(s).strip().lower() if s else "" def _coerce_price(v: Any) -> float: try: return float(v) if v is not None else 0.0 except Exception: return 0.0 # Alcohol/age-restricted terms are not priced in WhatsApp chat. # We keep the rest of the user's basket alive and point restricted items to the site. ALCOHOL_TERMS = { "alcohol", "beer", "lager", "stout", "wine", "whisky", "whiskey", "vodka", "gin", "rum", "brandy", "cider", "spirit", "spirits", "booze", "liquor", "champagne", "tequila", "amarula", "castle", "zambezi", "lion lager", "carling", "black label", "hunters", "savanna", "heineken", "stella", "chibuku", "scud", "super", } def contains_alcohol_reference(text: Any) -> bool: t = f" {_norm(text)} " return any(f" {term} " in t or term in t for term in ALCOHOL_TERMS) def split_restricted_items(item_names: List[str]) -> Tuple[List[str], List[str]]: allowed, restricted = [], [] for item in item_names or []: if contains_alcohol_reference(item): restricted.append(str(item)) else: allowed.append(str(item)) return allowed, restricted def format_restricted_items_note(restricted_items: List[str]) -> str: if not restricted_items: return "" return ( "🔞 Restricted item(s): " + ", ".join(restricted_items) + " — please visit https://pricelyst.co.zw for more information." ) def _fetch_page(url: str, page: int, per_page: int) -> Tuple[List[Dict], int]: """ Fetch a single page from the product listing API. Returns (data_list, total_pages). Raises on HTTP error so the caller can decide retry logic. """ import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) r = requests.get(url, params={"page": page, "perPage": per_page}, timeout=HTTP_TIMEOUT, verify=False) r.raise_for_status() payload = r.json() data = payload.get("data") or [] # API may return totalPages, last_page, or meta.last_page — check all total_pages = ( payload.get("totalPages") or payload.get("last_page") or (payload.get("meta") or {}).get("last_page") or 999 ) return data, int(total_pages) def fetch_and_flatten_data() -> pd.DataFrame: """ Full catalogue fetch — walks EVERY page, retries transient failures, never stops early on a single bad page. Strategy: - Fetch page 1 to discover total_pages - Then fetch pages 2..N concurrently (ThreadPoolExecutor) - Individual page failures are retried once, then logged and skipped so one bad page never kills the entire index - perPage=100 halves the number of round-trips vs perPage=50 """ import urllib3, concurrent.futures urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) url = f"{PRICE_API_BASE}/api/v1/product-listing" per_page = 100 all_raw: List[Dict] = [] # ── Page 1: discover total ─────────────────────────────────────────── try: page1_data, total_pages = _fetch_page(url, 1, per_page) all_raw.extend(page1_data) logger.info(f"ETL: Page 1/{total_pages} fetched ({len(page1_data)} products)") except Exception as e: logger.error(f"ETL: Page 1 failed — cannot continue: {e}") return pd.DataFrame() if total_pages <= 1: logger.info("ETL: Single-page catalogue.") else: # ── Pages 2..N: concurrent fetch ──────────────────────────────── remaining_pages = list(range(2, total_pages + 1)) def fetch_with_retry(pg: int) -> List[Dict]: for attempt in range(2): # one retry try: data, _ = _fetch_page(url, pg, per_page) logger.info(f"ETL: Page {pg}/{total_pages} — {len(data)} products") return data except Exception as e: if attempt == 0: logger.warning(f"ETL: Page {pg} attempt 1 failed ({e}), retrying...") time.sleep(0.5) else: logger.error(f"ETL: Page {pg} skipped after retry: {e}") return [] # Max 8 workers — enough concurrency without hammering the API with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: futures = {executor.submit(fetch_with_retry, pg): pg for pg in remaining_pages} for future in concurrent.futures.as_completed(futures): all_raw.extend(future.result()) logger.info(f"ETL: Total raw products fetched: {len(all_raw)}") # ── Flatten to rows ────────────────────────────────────────────────── rows = [] seen_ids: set = set() for p in all_raw: try: p_id = int(p.get("id") or 0) if p_id in seen_ids: continue seen_ids.add(p_id) p_name = str(p.get("name") or "Unknown").strip() brand_obj = p.get("brand") or {} brand_name = str(brand_obj.get("brand_name") or "").strip() cats = p.get("categories") or [] cat_names = [str(c.get("name") or "").strip() for c in cats if c.get("name")] primary_cat = cat_names[0] if cat_names else "General" # Rich search vector: name + brand + all categories + individual tokens # This makes "Tanganda Tips Tea Bags 100s Pouch" findable by: # "tanganda", "tips", "tea bags", "tea", "coffee", "milk" (category) vector_parts = [p_name, brand_name] + cat_names # Also add individual meaningful words from the product name name_tokens = [w for w in p_name.lower().split() if len(w) > 2 and w not in {"and", "the", "for", "with"}] vector_parts.extend(name_tokens) search_vector = _norm(" ".join(vector_parts)) views = int(p.get("view_count") or 0) image = str(p.get("thumbnail") or p.get("image") or "") prices = p.get("prices") or [] if not prices: rows.append({ "product_id": p_id, "product_name": p_name, "search_vector": search_vector, "brand": brand_name, "category": primary_cat, "retailer": "Listing", "price": 0.0, "views": views, "image": image, "is_offer": False, }) continue for offer in prices: retailer_obj = offer.get("retailer") or {} r_name = str(retailer_obj.get("name") or "Unknown Store").strip() price_val = _coerce_price(offer.get("price")) if price_val > 0: rows.append({ "product_id": p_id, "product_name": p_name, "search_vector": search_vector, "brand": brand_name, "category": primary_cat, "retailer": r_name, "price": price_val, "views": views, "image": image, "is_offer": True, }) except Exception as row_err: logger.warning(f"ETL: Skipped product {p.get('id','?')}: {row_err}") continue df = pd.DataFrame(rows) if not df.empty: # Build a secondary lookup index: brand -> product_ids for fast brand search logger.info( f"ETL: Flattened into {len(df)} rows | " f"{df['product_id'].nunique()} unique products | " f"{df['brand'].nunique()} brands | " f"{df['category'].nunique()} categories" ) return df def get_market_index(force_refresh: bool = False) -> pd.DataFrame: global _data_cache if (force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL)): logger.info("ETL: Refreshing Market Index...") df = fetch_and_flatten_data() _data_cache.update({"df": df, "ts": time.time(), "raw_count": len(df)}) return _data_cache["df"] # ───────────────────────────────────────────── # 5. Precision Search & Basket Optimisation # ───────────────────────────────────────────── # Store alias map — common shorthand -> canonical retailer name substring STORE_ALIASES: Dict[str, str] = { "pnp": "pick n pay", "p&p": "pick n pay", "pick n pay": "pick n pay", "picknpay": "pick n pay", "tmpnpay": "pick n pay", "tm": "pick n pay", "ok": "ok", "ok mart": "ok", "okmart": "ok", "spar": "spar", "food lovers": "food lover", "food lover": "food lover", "foodlovers": "food lover", "fl": "food lover", "choppies": "choppies", "bon marche": "bon marche", "bon": "bon marche", "checkers": "checkers", "game": "game", } # Generic category words — trigger category-aware search rather than literal match GENERIC_QUERIES = { "cooking oil", "oil", "maize meal", "meal", "bread", "milk", "sugar", "rice", "flour", "salt", "eggs", "coffee", "tea", "juice", "water", "soap", "washing powder", "detergent", "chicken", "beef", "fish", "butter", "margarine", "cheese", "yoghurt", "cereal", "nappies", "diapers", "toilet paper", "tissue", } def normalise_store_query(store_name: str) -> Optional[str]: """Return canonical store substring for df filtering, or None.""" return STORE_ALIASES.get(_norm(store_name)) def search_products_deep(df: pd.DataFrame, query: str, limit: int = 50) -> pd.DataFrame: """ Balanced multi-signal search. Designed to fix: "cooking oil" -> "Olive Pride Extra Virgin Olive Oil" instead of "Zimgold Cooking Oil 2L". Signal hierarchy: 1000 - exact product name match 600 - ALL query tokens present in vector (strict full coverage) 500 - query string is a substring of the product name 200 - brand name appears in the query +80ea - per-token overlap (partial match) 300 - generic category bonus: product name STARTS with the category -150 - anti-inflation penalty: generic query matched deep inside a long fancy product name (e.g. "oil" in "Extra Virgin Olive Oil") """ if df.empty or not query: return df q_norm = _norm(query) q_tokens = [t for t in q_norm.split() if len(t) > 1] q_set = set(q_tokens) is_generic = (q_norm in GENERIC_QUERIES or (bool(q_tokens) and all(t in GENERIC_QUERIES for t in q_tokens))) def score(row) -> int: s = 0 p_norm = _norm(row["product_name"]) vector = row["search_vector"] v_set = set(vector.split()) b_norm = _norm(row.get("brand", "")) # 1. Exact full name match if q_norm == p_norm: return 2000 # 2. All query tokens present in vector (strict full coverage) # e.g. ["tanganda", "tips"] both in vector → strong signal if q_set and q_set.issubset(v_set): s += 800 # 3. Query is a sequential substring of the product name # "tanganda tips" in "tanganda tips tea bags 100s pouch" → very strong if q_norm in p_norm: s += 700 # 4. Product name starts with query (tightest positional match) # "zimgold cooking oil" → "zimgold cooking oil 2l" starts with it if p_norm.startswith(q_norm): s += 400 # 5. Brand token is in query AND brand is in product name # "tanganda" in query AND brand == "Tanganda" → brand fidelity if b_norm and b_norm in q_norm and b_norm in p_norm: s += 350 # 6. Brand appears in query (even without product name match) elif b_norm and b_norm in q_norm: s += 150 # 7. Per-token overlap score (partial matching) overlap = len(q_set.intersection(v_set)) s += overlap * 90 # 8. Generic category bonus: product name starts with the generic term # "Cooking Oil 2L" starts with "cooking oil" → beats fancy oil names if is_generic and p_norm.startswith(q_norm): s += 300 # 9. Anti-inflation: weak generic match buried in long fancy name # e.g. query "oil" matching "Extra Virgin Cold-Pressed Avocado Oil" if s < 150 and is_generic and len(p_norm.split()) > 4: s = max(0, s - 200) return s df_scored = df.copy() df_scored["match_score"] = df_scored.apply(score, axis=1) matches = df_scored[df_scored["match_score"] > 0] if matches.empty: return matches return matches.sort_values( ["match_score", "views", "price"], ascending=[False, False, True] ).head(limit) def calculate_basket_optimization(item_names: List[str], preferred_retailer: Optional[str] = None) -> Dict[str, Any]: # Keep age-restricted/alcohol mentions out of WhatsApp pricing while still # processing the rest of the basket. Example: milk + Zambezi + sugar → # price milk/sugar, point Zambezi to the site. clean_items, restricted_items = split_restricted_items(item_names or []) df = get_market_index() if df.empty: return {"actionable": False, "error": "Market data unavailable. Please try again shortly."} if restricted_items and not clean_items: return { "actionable": True, "found_items": [], "global_missing": [], "restricted_items": restricted_items, "market_matrix": [], "best_store": None, "preferred_retailer": preferred_retailer, } item_names = clean_items # Normalise store alias (e.g. "TmPnPay" -> "pick n pay") store_filter: Optional[str] = None if preferred_retailer: store_filter = normalise_store_query(preferred_retailer) if not store_filter: store_filter = _norm(preferred_retailer) # use raw normalised if not in alias map found_items = [] missing_global = [] for item in item_names: hits = search_products_deep(df[df["is_offer"] == True], item, limit=50) if hits.empty: missing_global.append(item) continue best_match = hits.iloc[0] q_norm = _norm(item) res_norm = _norm(f"{best_match['product_name']} {best_match['brand']}") q_tokens = q_norm.split() is_sub = (len(q_tokens) > 1 and sum(1 for t in q_tokens if t in res_norm) < len(q_tokens)) product_offers = (hits[hits["product_name"] == best_match["product_name"]] .sort_values("price")) offers_list = [{"retailer": r["retailer"], "price": float(r["price"])} for _, r in product_offers.iterrows()] best_price = offers_list[0]["price"] max_price = offers_list[-1]["price"] found_items.append({ "query": item, "product_name": str(best_match["product_name"]), "brand": str(best_match["brand"]), "category": str(best_match["category"]), "image": str(best_match["image"]), "is_substitute": is_sub, "offers": offers_list, "best_price": best_price, "potential_savings": max_price - best_price, }) if not found_items: return { "actionable": True, "found_items": [], "global_missing": missing_global, "restricted_items": restricted_items, } # Market Matrix — if a store filter is set, show that store first in comparison all_retailers = set(o["retailer"] for f in found_items for o in f["offers"]) store_comparison = [] for retailer in all_retailers: total_price = 0.0 found_count = 0 missing_list = [] for item in found_items: price = next((o["price"] for o in item["offers"] if o["retailer"] == retailer), None) if price: total_price += price found_count += 1 else: missing_list.append(item["product_name"]) store_comparison.append({ "retailer": retailer, "total_price": total_price, "found_count": found_count, "total_items": len(found_items), "missing_items": missing_list, }) store_comparison.sort(key=lambda x: (-x["found_count"], x["total_price"])) if len(store_comparison) > 1: max_total = max(s["total_price"] for s in store_comparison if s["found_count"] == store_comparison[0]["found_count"]) for s in store_comparison: s["basket_savings"] = ( max_total - s["total_price"] if s["found_count"] == store_comparison[0]["found_count"] else 0.0 ) else: for s in store_comparison: s["basket_savings"] = 0.0 # If a specific store was requested, pin it and its comparison to the front if store_filter: pinned = [s for s in store_comparison if store_filter in _norm(s["retailer"])] rest = [s for s in store_comparison if store_filter not in _norm(s["retailer"])] store_comparison = pinned + rest return { "actionable": True, "is_basket": len(found_items) > 1, "found_items": found_items, "global_missing": missing_global, "restricted_items": restricted_items, "market_matrix": store_comparison[:5], "best_store": store_comparison[0] if store_comparison else None, "preferred_retailer": preferred_retailer, "store_filter": store_filter, } # ───────────────────────────────────────────── # 6. ZESA Calculator # ───────────────────────────────────────────── def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]: """ Calculate ZESA units for a USD amount using the actual 2026 ZETDC stepped tariff bands (ZiG-billed, USD equivalent incl. 6% REA levy). Source: zimpricecheck.com / ZERA approved tariffs, May 2026. Band structure (USD per unit, incl. 6% Rural Electrification Levy): 0-50 units: $0.08/unit 51-100: $0.09/unit 101-200: $0.16/unit 201-300: $0.23/unit 301-400: $0.25/unit 401+: $0.26/unit """ bands = ZIM_CONTEXT["zesa_bands"] remaining = float(amount_usd) units = 0.0 prev_limit = 0 band_breakdown = [] for band in bands: band_size = band["limit"] - prev_limit band_cost = band_size * band["rate_usd"] if remaining <= 0: break if remaining >= band_cost: units += band_size remaining -= band_cost band_breakdown.append({ "band": f"{prev_limit+1}-{band['limit']}", "units": band_size, "rate": band["rate_usd"], }) else: partial = remaining / band["rate_usd"] units += partial band_breakdown.append({ "band": f"{prev_limit+1}-{band['limit']}", "units": round(partial, 1), "rate": band["rate_usd"], }) remaining = 0 prev_limit = band["limit"] return { "amount_usd": float(amount_usd), "est_units_kwh": float(round(units, 1)), "band_breakdown": band_breakdown, "note": ZIM_CONTEXT["zesa_note"], } # ───────────────────────────────────────────── # 7. Smart WhatsApp Formatter (Upgrade 1) # ───────────────────────────────────────────── def format_basket_for_whatsapp(analyst: Dict, language: str = "English") -> str: """ Smart formatter: - ≤3 items → full per-store price breakdown for each item - >3 items → cheapest basket total + store comparison + highlights Spaced for mobile readability. Concise but complete. """ if not analyst.get("actionable"): return analyst.get("error", "Sorry, I couldn't fetch price data right now.") found = analyst.get("found_items", []) missing = analyst.get("global_missing", []) restricted = analyst.get("restricted_items", []) matrix = analyst.get("market_matrix", []) lines = [] if not found: note = format_restricted_items_note(restricted) if missing: base = (f"⚠️ Couldn't find: {', '.join(missing)}\n" f"_Try a shorter product name._") return base + ("\n\n" + note if note else "") if note: return note return "No results found. Try a different search term." n = len(found) # ── ≤ 3 items: full per-store breakdown ────────────────────────────── if n <= 3: for item in found: sub = " _(nearest match)_" if item.get("is_substitute") else "" lines.append(f"🏷️ *{item['product_name']}*{sub}") for o in item["offers"][:4]: is_best = o["price"] == item["best_price"] tick = "✅" if is_best else "▪️" lines.append(f" {tick} {o['retailer']}: *${o['price']:.2f}*") savings = item.get("potential_savings", 0) if savings > 0.10: lines.append(f" 💡 Save *${savings:.2f}* at cheapest store") lines.append("") if n > 1 and matrix: best = matrix[0] lines.append(f"🏪 Best basket: *{best['retailer']}* — *${best['total_price']:.2f}*") sv = best.get("basket_savings", 0) if sv > 0.10: lines.append(f"💰 Saves *${sv:.2f}* vs most expensive option") # ── > 3 items: basket summary + highlights ──────────────────────────── else: lines.append(f"🛒 *{n}-item basket*\n") if matrix: for s in matrix[:4]: cover = f"{s['found_count']}/{s['total_items']} items" marker = "✅" if s == matrix[0] else "▪️" lines.append(f" {marker} *{s['retailer']}*: *${s['total_price']:.2f}* ({cover})") lines.append("") sv = matrix[0].get("basket_savings", 0) if sv > 0.10: lines.append(f"💰 *{matrix[0]['retailer']}* saves you *${sv:.2f}* on this basket") lines.append("") big = sorted(found, key=lambda x: x.get("potential_savings", 0), reverse=True) hot = [x for x in big if x.get("potential_savings", 0) > 0.20][:3] if hot: lines.append("🔥 *Biggest savings:*") for item in hot: lines.append( f" • *{item['product_name']}* " f"${item['best_price']:.2f} @ {item['offers'][0]['retailer']} " f"(save ${item['potential_savings']:.2f})" ) lines.append("") subs = [f for f in found if f.get("is_substitute")] if subs: lines.append("⚠️ *Nearest matches:*") for s in subs: lines.append(f" _{s['query']}_ → {s['product_name']} (${s['best_price']:.2f})") lines.append("") if missing: lines.append(f"❓ Not found: {', '.join(missing)}") lines.append("_Try a simpler name or check in-store._") if restricted: lines.append(format_restricted_items_note(restricted)) lines.append("") lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*") lines.append("www.pricelyst.co.zw") return "\n".join(lines) # ───────────────────────────────────────────── # 8. Budget & Meal Planning Engine (Upgrade 2) # ───────────────────────────────────────────── def gemini_plan_budget(transcript: str, budget: float, context: str, catalogue_snapshot: str, language: str = "English") -> str: """ Full AI budget planner using real catalogue prices. Handles: fixed-amount shopping, monthly household, party/event, meal prep. """ if not _gemini_client: return "Budget planning is temporarily unavailable. Please try again." PROMPT = f""" You are Pricelyst AI, Pricelyst Zimbabwe's AI Shopping & Budget Advisor. You have access to REAL current prices from Zimbabwe's top supermarkets. USER REQUEST: "{transcript}" DETECTED BUDGET: ${budget:.2f} USD CONTEXT CLUES: {context} PRICELYST CATALOGUE — FULL PRODUCT INDEX (use these first, estimate only if absent): {catalogue_snapshot} CRITICAL: Walk the entire catalogue above before making any price estimate. For every item you include, check if it exists in the catalogue first. Only use "(est. ~$X)" for items with ZERO catalogue matches. Do not price alcohol or age-restricted products in WhatsApp; for those items say: please visit https://pricelyst.co.zw for more information. Keep the response concise and practical. ZIMBABWE CONTEXT: - Average family of 4 monthly food spend: $150-$250 - ZESA: $10 ≈ 108 units, $20 ≈ 210 units (2026 tariff) - ZESA: bands $0.08-$0.26/unit depending on consumption level YOUR TASK — respond based on what the user asked: 1. FIXED BUDGET SHOPPING ("I have $50"): - Build an optimal shopping list that maximises value within budget - Show exact items, quantities, store, and prices - Show running total as you add items - Show remaining balance after each category - Recommend the single best store for this basket 2. MONTHLY HOUSEHOLD BUDGET ("$2000 for a month, family of 4"): - Break down the budget into categories: Food, Household, Transport, ZESA, Contingency - For food: build a full monthly shopping list with real prices - Calculate cost per person per day - Show weekly vs monthly shopping rhythm recommendation - Flag where they can save most 3. EVENT/PARTY PLANNING ("dinner for 10", "party for 100 people"): - Scale ingredients appropriately for the headcount - Build a full shopping list with quantities scaled to guests - Show per-head cost - Recommend best store combination for the event - Include drinks, condiments, serviettes etc. from catalogue 4. MEAL PREP PLANNING ("weekly meal prep", "5 days of lunches"): - Suggest 5-7 practical Zimbabwean meals using real catalogue products - Show shopping list with prices for the full week - Calculate cost per meal and cost per serving - Give simple prep tips FORMAT RULES (WhatsApp — no markdown headers ##): - Keep it concise: maximum 6 short sections, avoid long explanations - Use *bold* for totals, store names, key numbers - Use emojis naturally 💰 🛒 📊 ✅ 🍽️ - Structure with clear sections separated by blank lines - Be specific with numbers — show your working - Always end with: total cost, per-person cost, and best store recommendation - If budget is tight, flag it honestly and suggest stretching strategies Respond in {language}. Be a genuinely useful financial advisor, not just a list generator. """ try: resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) return sanitise_response(resp.text) except Exception as e: logger.error(f"Budget planner error: {e}") return "I had trouble calculating your budget plan. Please try again!" def gemini_generate_recipe( meal_name: str, found_items: List[Dict], servings: int = 4, language: str = "English") -> str: """ Generate a full recipe using real Pricelyst catalogue products. Format: ingredients+prices, total cost, steps, serving info. Estimates only for items absent from the catalogue. """ if not _gemini_client: return "Recipe generation is temporarily unavailable." products_str = "\n".join( f"- {item['product_name']} (${item['best_price']:.2f} @ {item['offers'][0]['retailer']})" for item in found_items ) if found_items else "No catalogue matches — use estimated prices for all ingredients." PROMPT = f""" You are Pricelyst AI, Zimbabwe\'s smart shopping and meal planning assistant. Generate a complete, practical recipe for: *{meal_name}* Servings: {servings} people PRICELYST CATALOGUE PRODUCTS (walk these FIRST — real prices, real stores): {products_str} OUTPUT FORMAT (WhatsApp plain text — follow this order exactly): *🍽️ {meal_name}* | Serves {servings} *🛒 Ingredients & Prices:* • [Ingredient] [qty] — *$[price]* @ [store] (For each item: use catalogue price if found, else "(est. ~$X)") *💰 Total:* $X.XX | Per serving: $X.XX Best store: [store name covering most items] *⏱️ Steps:* 1. [step] 2. [step] (Max 8 steps, practical for a Zimbabwean home kitchen) *💡 Tips:* [1 storage or substitution tip using catalogue alternatives] *For more recipes & prices: pricelyst.co.zw* RULES: - Walk the ENTIRE catalogue before estimating any price - Only mark "(est. ~$X)" if genuinely absent from catalogue - Realistic Zimbabwean home quantities, not restaurant portions - *bold* prices and totals only, plain text elsewhere - NO image URLs, NO external links except pricelyst.co.zw Respond in {language}. """ try: resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) return sanitise_response(resp.text) except Exception as e: logger.error(f"Recipe generation error: {e}") return "I had trouble generating the recipe. Please try again!" def gemini_analyze_meal_image_to_recipe( image_bytes: bytes, caption: str, found_items: List[Dict], language: str = "English") -> str: """ Full pipeline: image of a meal → identify dish → recipe → shopping list with prices. """ if not _gemini_client: return "Image recipe analysis is temporarily unavailable." products_str = "\n".join( f"- {item['product_name']} (${item['best_price']:.2f} @ {item['offers'][0]['retailer']})" for item in found_items ) if found_items else "No catalogue matches found — use estimated prices." PROMPT = f""" You are Pricelyst AI, Pricelyst Zimbabwe's AI Meal & Shopping Advisor. A user sent a photo of a meal/dish. Your job: identify it, then immediately provide a full recipe with a Zimbabwe-priced shopping list. USER CAPTION: "{caption}" AVAILABLE CATALOGUE PRODUCTS: {products_str} DO THIS IN ONE RESPONSE: 1. 🍽️ *Identify the dish* — name it confidently 2. 📝 *Full recipe* — ingredients with quantities for 4 servings 3. 🛒 *Shopping list* — match ingredients to catalogue products with prices 4. 💰 *Total cost* — exact calculation, cost per serving, best store 5. ⏱️ *Quick cook guide* — 5-6 simple steps Keep it warm, practical, Zimbabwean. Format for WhatsApp (*bold*, emojis). Respond in {language}. """ try: image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=[PROMPT, image_part], ) return sanitise_response(resp.text) except Exception as e: logger.error(f"Meal image recipe error: {e}") return "I had trouble analyzing that meal photo. Please try again!" def build_catalogue_snapshot() -> str: """ Build a COMPLETE category-organised price snapshot for Gemini prompts. No limits — every product in the index is included so Gemini sees the full catalogue when generating recipes, budgets and meal plans. Niche products are equally represented alongside popular ones. Within each category: sorted by views desc then price asc so the most contextually useful products appear first in token budget. """ df = get_market_index() if df.empty: return "Catalogue unavailable." try: offers = df[df["is_offer"] == True].copy() if offers.empty: return "No priced products available." # Best price per product across all stores — keep brand for context best_prices = ( offers.groupby(["product_name", "category", "brand"]) .agg(min_price=("price", "min"), retailer=("retailer", "first"), views=("views", "max")) .reset_index() ) lines = [] total_products = 0 # All categories, sorted alphabetically for consistency categories = sorted(best_prices["category"].unique()) for cat in categories: cat_df = (best_prices[best_prices["category"] == cat] .sort_values(["views", "min_price"], ascending=[False, True])) if cat_df.empty: continue lines.append(f"\n[{cat}]") for _, row in cat_df.iterrows(): brand_str = f" ({row['brand']})" if row["brand"] else "" lines.append( f" {row['product_name']}{brand_str}: " f"${row['min_price']:.2f} @ {row['retailer']}" ) total_products += 1 logger.info( f"Catalogue snapshot: {total_products} products " f"across {len(categories)} categories (no limits applied)" ) return "\n".join(lines) except Exception as e: logger.error(f"Catalogue snapshot error: {e}") return "Catalogue snapshot unavailable." def get_products_by_category(category_keyword: str, limit: int = 20) -> List[Dict]: """ Return all products matching a category keyword — used by recipe/budget engines to surface the full range of available options in a category. """ df = get_market_index() if df.empty: return [] try: kw = _norm(category_keyword) offers = df[df["is_offer"] == True].copy() matches = offers[offers["category"].str.lower().str.contains(kw, na=False) | offers["search_vector"].str.contains(kw, na=False)] if matches.empty: return [] # Best price per product in this category best = (matches.groupby("product_name") .agg(best_price=("price", "min"), retailer=("retailer", "first"), brand=("brand", "first"), category=("category", "first")) .reset_index() .sort_values("best_price")) return best.head(limit).to_dict("records") except Exception as e: logger.error(f"get_products_by_category error: {e}") return [] def extract_budget_from_text(transcript: str) -> Tuple[Optional[float], str]: """ Pull a dollar amount and context from free text. Returns (amount, context_description). Examples: "I have $50" → (50.0, "fixed_budget") "$2000 for the month" → (2000.0, "monthly_household") "party for 100 people" → (None, "event_100_people") "dinner for 5" → (None, "event_5_people") """ # Find dollar amount amount_match = re.search(r'\$\s*([\d,]+(?:\.\d{1,2})?)', transcript) if not amount_match: amount_match = re.search(r'([\d,]+(?:\.\d{1,2})?)\s*(?:dollars?|usd)', transcript, re.I) amount = None if amount_match: try: amount = float(amount_match.group(1).replace(",", "")) except ValueError: pass # Determine context t = transcript.lower() if any(w in t for w in ["month", "monthly", "per month"]): context = "monthly_household" elif any(w in t for w in ["party", "event", "wedding", "function", "people", "guests", "pax"]): people_match = re.search(r'(\d+)\s*(?:people|guests|pax|persons?)', t) guests = people_match.group(1) if people_match else "unknown" context = f"event_{guests}_people" elif any(w in t for w in ["week", "weekly", "meal prep", "prep"]): context = "weekly_meal_prep" elif any(w in t for w in ["recipe", "cook", "make", "prepare", "how to"]): context = "recipe_request" elif amount: context = "fixed_budget" else: context = "general_planning" return amount, context return amount, context # ───────────────────────────────────────────── # 8b. Conversation Context Engine # ───────────────────────────────────────────── def apply_context_mutation( current_basket: List[str], current_found: List[Dict], mutation: Dict) -> Tuple[List[str], str]: """ Apply a context mutation to an active basket. Returns (new_item_list, description_of_change). Mutations understood: add_items : ["milk", "bread"] remove_items : ["coffee"] — matched fuzzily remove_most_expensive : True remove_cheapest : True what_if_add : ["milk"] — compute total but don't persist """ action = mutation.get("action", "") target_items = [_norm(x) for x in mutation.get("items", [])] desc = "" if action == "add_items": new_items = list(current_basket) + mutation.get("items", []) desc = f"Added: {', '.join(mutation.get('items', []))}" return new_items, desc if action == "remove_items": removed = [] new_basket = [] for orig in current_basket: # Match against both original query and resolved product name orig_norm = _norm(orig) match = any( t in orig_norm or orig_norm in t for t in target_items ) if match: removed.append(orig) else: new_basket.append(orig) desc = f"Removed: {', '.join(removed)}" if removed else "Item not found in current basket" return new_basket, desc if action == "remove_most_expensive": if not current_found: return current_basket, "No active basket to modify" most_exp = max(current_found, key=lambda x: x.get("best_price", 0)) new_basket = [x for x in current_basket if _norm(x) not in _norm(most_exp["product_name"]) and _norm(most_exp["product_name"]) not in _norm(x)] desc = f"Removed most expensive: {most_exp['product_name']} (${most_exp['best_price']:.2f})" return new_basket, desc if action == "remove_cheapest": if not current_found: return current_basket, "No active basket to modify" cheapest = min(current_found, key=lambda x: x.get("best_price", 0)) new_basket = [x for x in current_basket if _norm(x) not in _norm(cheapest["product_name"]) and _norm(cheapest["product_name"]) not in _norm(x)] desc = f"Removed cheapest: {cheapest['product_name']} (${cheapest['best_price']:.2f})" return new_basket, desc return current_basket, "No change" def gemini_resolve_context_intent(message: str, active_basket: List[str], active_found: List[Dict]) -> Dict[str, Any]: """ Determine if a message is a context mutation on the active basket, or a fresh new query. Returns a dict: { "is_context": bool, "action": "add_items|remove_items|remove_most_expensive|remove_cheapest| what_if_add|show_total|fresh_query", "items": ["item1"], # for add/remove/what_if "explanation": "str" } """ if not _gemini_client or not active_basket: return {"is_context": False, "action": "fresh_query", "items": []} basket_str = ", ".join(active_basket) found_str = "\n".join( f"- {f['product_name']}: ${f['best_price']:.2f}" for f in (active_found or []) ) PROMPT = f""" You are a shopping assistant context resolver. The user has an ACTIVE BASKET: [{basket_str}] Resolved products and prices: {found_str} New user message: "{message}" Decide: is this message a MODIFICATION of the active basket, or a FRESH NEW QUERY? Context modifications include: - "remove coffee" / "take out the bread" / "remove the most expensive" → remove_items / remove_most_expensive - "add milk" / "what if I add eggs" / "include sugar" → add_items / what_if_add - "remove the cheapest" / "drop the cheapest item" → remove_cheapest - "what's the total now" / "show total" → show_total - "let's see" / "ok" / "sounds good" → show_total (treat as confirmation request) Fresh new queries include: - Completely new products unrelated to the basket ("price of diapers") - New categories / topics ("deals today", "ZESA units") - Greetings or topic changes Return STRICT JSON: {{ "is_context": boolean, "action": "add_items|remove_items|remove_most_expensive|remove_cheapest|what_if_add|show_total|fresh_query", "items": ["item to add or remove"], "explanation": "one line reason" }} """ try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=PROMPT, config=genai_types.GenerateContentConfig(response_mime_type="application/json") ) result = _safe_json_loads(resp.text, {"is_context": False, "action": "fresh_query", "items": []}) logger.info(f"Context resolution: {result}") return result except Exception as e: logger.error(f"Context resolution error: {e}") return {"is_context": False, "action": "fresh_query", "items": []} def format_context_result( action: str, change_desc: str, analyst: Dict, what_if: bool = False) -> str: """Format a context mutation result as a WhatsApp message.""" lines = [] if action == "show_total" or not change_desc: lines.append("🛒 *Current Basket*\n") elif what_if: lines.append(f"🔮 *What if scenario*\n_{change_desc}_\n") else: lines.append(f"✅ *Basket updated*\n_{change_desc}_\n") found = analyst.get("found_items", []) matrix = analyst.get("market_matrix", []) missing = analyst.get("global_missing", []) if found: for item in found: sub = " _(nearest match)_" if item.get("is_substitute") else "" lines.append(f"• *{item['product_name']}*{sub} — ${item['best_price']:.2f}") lines.append("") if matrix: best = matrix[0] lines.append(f"💰 *Best basket total: ${best['total_price']:.2f}* @ {best['retailer']}") savings = best.get("basket_savings", 0) if savings > 0.10: lines.append(f" Saves *${savings:.2f}* vs most expensive option") lines.append("") if len(matrix) > 1: lines.append("Other stores:") for s in matrix[1:3]: lines.append(f" {s['retailer']}: ${s['total_price']:.2f}") if missing: lines.append(f"\n⚠️ Not found: {', '.join(missing)}") lines.append("") lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*") lines.append("www.pricelyst.co.zw") return "\n".join(lines) # ───────────────────────────────────────────── # 9. Gemini Helpers # ───────────────────────────────────────────── def _safe_json_loads(s: str, fallback: Any) -> Any: try: cleaned = s if "```json" in cleaned: cleaned = cleaned.split("```json")[1].split("```")[0] elif "```" in cleaned: cleaned = cleaned.split("```")[1] return json.loads(cleaned.strip()) except Exception as e: logger.error(f"JSON parse error: {e} | raw: {s[:300]}") return fallback def gemini_detect_intent(transcript: str) -> Dict[str, Any]: """Classify intent including new BUDGET_PLANNER and RECIPE_REQUEST intents.""" if not _gemini_client: return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} PROMPT = """ Analyze the transcript below and return STRICT JSON. Intents: - CASUAL_CHAT : Greetings, "hi", off-topic - SHOPPING_BASKET : Searching for prices / cheapest X - UTILITY_CALC : Electricity / ZESA / fuel cost questions - STORE_DECISION : "Which store is cheapest?", "Where should I shop?" - EVENT_PLANNING : Implicit lists — "plan a braai", "wedding grocery list", "dinner for 5" - CATALOGUE_REQUEST : User wants a PDF price list / catalogue / deals sheet - DEALS_EXPLORE : "Today's deals", "promotions", "what's on special" - DISCOVER : "What products do you have?", "show me your categories" - BUDGET_PLANNER : User mentions a budget amount + shopping/planning goal. e.g. "I have $50", "$2000 for the month", "party for 100 people", "what can I buy with $30", "monthly groceries for family of 4" - RECIPE_REQUEST : User wants a recipe, meal idea, how to cook something, - FEEDBACK : User wants to give feedback, suggest a product, report a missing item, report a wrong price, or make a general suggestion about the service. meal prep plan, or sends a meal image. e.g. "recipe for sadza", "how do I cook bream", "weekly meal plan" Rules: - Extract items: translate ALL items to English (e.g. 'hupfu' → 'maize meal'). - If only a concept is given (e.g. "plan a braai"), set is_event_planning=true, items=[]. - Detect user language accurately (Shona, Ndebele, English). - budget_amount: extract numeric USD amount if mentioned (e.g. 50 from "$50"). - headcount: extract number of people if mentioned (e.g. 100 from "party for 100"). - meal_name: extract dish name if recipe is requested. - store_preference: store name if explicitly mentioned. - utility_amount: numeric value for ZESA queries. JSON Schema: { "actionable": boolean, "intent": "string", "items": ["string"], "utility_amount": number, "budget_amount": number | null, "headcount": number | null, "meal_name": "string | null", "store_preference": "string | null", "is_event_planning": boolean, "language": "string", "catalogue_scope": "string | null" } Transcript: """ + transcript try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=PROMPT, config=genai_types.GenerateContentConfig(response_mime_type="application/json") ) return _safe_json_loads(resp.text, { "actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": [], }) except Exception as e: logger.error(f"Intent detect error: {e}") return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} def gemini_explode_concept(transcript: str) -> List[str]: """Converts an event/meal concept into a concrete grocery list.""" if not _gemini_client: return [] PROMPT = f""" User wants to plan: "{transcript}" Generate 10-15 essential Zimbabwean grocery items for this. Use English terms for database lookup (e.g. 'Maize Meal', 'Cooking Oil', 'Beef'). Return ONLY a JSON list of strings. """ try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=PROMPT, config=genai_types.GenerateContentConfig(response_mime_type="application/json") ) return _safe_json_loads(resp.text, []) except Exception as e: logger.error(f"Concept explode error: {e}") return [] def gemini_analyze_image(image_bytes: bytes, caption: str = "") -> Dict[str, Any]: """Analyse a WhatsApp image — grocery list, product, or meal dish.""" if not _gemini_client: return {"type": "IRRELEVANT", "items": [], "description": ""} PROMPT = f""" Analyze this image. Context caption: "{caption}" Classify: 1. SHOPPING_LIST → Extract each item (translate to English). 2. SINGLE_PRODUCT → Extract BRAND + NAME (e.g. "Pepsi 500ml"). 3. MEAL_DISH → Identify dish name + core ingredients. 4. IRRELEVANT → Not shopping related. Return STRICT JSON: {{ "type": "SHOPPING_LIST" | "SINGLE_PRODUCT" | "MEAL_DISH" | "IRRELEVANT", "items": ["item1", "item2"], "description": "short description" }} """ try: image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=[PROMPT, image_part], config=genai_types.GenerateContentConfig(response_mime_type="application/json") ) return _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": [], "description": ""}) except Exception as e: logger.error(f"Vision error: {e}") return {"type": "IRRELEVANT", "items": [], "description": ""} def sanitise_response(text: str) -> str: """ Post-process any Gemini response before sending to WhatsApp. - Strips all raw URLs except pricelyst.co.zw (no product image links, no api.pricelyst.co.zw/images/... paths, no external URLs) - Removes lines that are just a bare URL (image link lines Gemini adds) - Normalises excessive blank lines to max 2 in a row """ import re lines = text.split("\n") clean = [] for line in lines: stripped = line.strip() # Drop lines that are just a URL (image references, API links) if re.match(r'^https?://\S+$', stripped): continue # Drop inline image URL patterns from Gemini # e.g. "Here is an image: https://api.pricelyst.co.zw/images/..." line = re.sub( r'https?://(?!(?:www\.)?pricelyst\.co\.zw)\S+', '', line ) # Drop lines that became empty after URL removal line_stripped = line.strip() if not line_stripped: clean.append("") else: clean.append(line) # Collapse runs of more than 2 blank lines result = [] blank_count = 0 for line in clean: if line.strip() == "": blank_count += 1 if blank_count <= 2: result.append(line) else: blank_count = 0 result.append(line) return "\n".join(result).strip() def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, chat_history: str = "", language: str = "English") -> str: """Generate Pricelyst AI's WhatsApp reply for standard intents.""" if not _gemini_client: return "Hi! I'm Pricelyst AI from Pricelyst. Having a bit of trouble — please try again shortly." context_str = "" if chat_history: context_str += f"RECENT CHAT:\n{chat_history}\n\n" zesa_10 = calculate_zesa_units(10) zesa_20 = calculate_zesa_units(20) context_str += ( f"ZIMBABWE CONTEXT (ZESA 2026 tariffs, incl 6% levy):\n" f" $10 = {zesa_10['est_units_kwh']} units | " f"$20 = {zesa_20['est_units_kwh']} units\n" f" Bands: 0-50u=$0.08/u, 51-100=$0.09/u, 101-200=$0.16/u, " f"201-300=$0.23/u, 301-400=$0.25/u, 401+=$0.26/u\n" f" Note: {ZIM_CONTEXT['zesa_note']}\n" ) if analyst_data: context_str += f"\nANALYST DATA:\n{json.dumps(analyst_data, default=str)}\n" PROMPT = f""" You are Pricelyst AI, Pricelyst Zimbabwe's friendly WhatsApp Shopping Advisor 🛒. Mission: shortest path to value + complete price transparency for Zimbabwean shoppers. INPUT: "{transcript}" USER LANGUAGE: {language} INTENT: {intent.get('intent', 'CASUAL_CHAT')} CONTEXT: {context_str} FORMATTING (WhatsApp plain text — NO ## headers): - *bold* for store names, prices, key figures - Emojis naturally (✅ 🛒 💰 📍 ⚠️ 🔥 🍽️ 📸 ⚡) - Blank lines between sections — breathe, don't cram - Mobile-first: short lines, scannable - NEVER include image URLs, product image links, or any URLs except pricelyst.co.zw - NO product images — WhatsApp does not render inline images from URLs in chat LOGIC: 1. BASKET (≤3 items): The formatter already structured it — just add a warm intro line. 2. BASKET (>3 items): Highlight the best store deal and 2-3 standout savings. 3. SINGLE ITEM: Best price first, then 2-3 alternatives. State exact savings. 4. ZESA: Show units calculation clearly with tier breakdown. 5. CASUAL/GREETING: You are a smart shopping advisor, not a generic chatbot. Do NOT just say "Hi how can I help". Instead, acknowledge warmly AND show one specific insight from the analyst data if available (a deal, a fact, a tip). End with a concrete invitation to search. 6. DEALS_EXPLORE: List 5-8 deals with price and store. Make it feel like a real market bulletin — exciting, specific, local. 7. EVENT_PLANNING: Acknowledge warmly, then present the basket clearly. 8. CATALOGUE_REQUEST: Confirm PDF is being prepared. 9. DISCOVER: List available categories with examples of what to search in each. 10. OFF_TOPIC: Gently steer back to shopping — you are a shopping advisor, not a general assistant. Always end with a specific, useful follow-up question or CTA — not a generic one. """ try: resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) return sanitise_response(resp.text) except Exception as e: logger.error(f"Chat response error: {e}") return "I checked the prices but I'm having trouble displaying them right now. Please try again!" def gemini_translate(text: str, target_lang: str) -> str: """Translate Pricelyst AI's English response into the user's language if needed.""" if not _gemini_client or not target_lang or target_lang.lower() == "english": return text PROMPT = f""" Translate this WhatsApp shopping assistant reply from English to {target_lang}. Rules: - Keep prices ($X.XX), store names, product names UNCHANGED. - Keep WhatsApp formatting (*bold*, emojis) UNCHANGED. - Natural, conversational tone. Text: "{text}" """ try: resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) return resp.text.strip() except Exception as e: logger.error(f"Translation error: {e}") return text # ───────────────────────────────────────────── # 10. Catalogue PDF Generator # ───────────────────────────────────────────── def _build_pdf_header(story, styles, scope_label: str, title: str) -> None: """Shared Pricelyst branded header for all PDF types.""" from reportlab.lib import colors from reportlab.lib.styles import ParagraphStyle from reportlab.lib.units import cm from reportlab.platypus import (Paragraph, Spacer, HRFlowable, Table as RLTable, TableStyle as RLTS) from reportlab.platypus import Image as RLImage from reportlab.lib.enums import TA_CENTER NAVY = colors.HexColor("#003087") RED = colors.HexColor("#E63329") GREY = colors.HexColor("#555555") sub_style = ParagraphStyle("SubH", parent=styles["Normal"], fontSize=10, textColor=GREY, alignment=TA_CENTER, spaceAfter=2) wordmark = ParagraphStyle("WM", parent=styles["Title"], fontSize=24, textColor=NAVY, fontName="Helvetica-Bold") # Use module-level cached logo — downloaded via requests (works on HF), # not urllib.request which hits the same egress block as Meta/Cloudflare logo_path = _get_logo_path() logo_el = None if logo_path: try: logo_el = RLImage(logo_path, width=3*cm, height=3*cm, kind="proportional") except Exception as e: logger.warning(f"Logo RLImage init failed: {e}") if logo_el: hdr_data = [[logo_el, [Paragraph("PRICELYST.", wordmark), Paragraph("Zimbabwe\'s #1 Price Comparison", sub_style)]]] hdr_tbl = RLTable(hdr_data, colWidths=[3.5*cm, None]) hdr_tbl.setStyle(RLTS([ ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("LEFTPADDING", (0, 0), (-1, -1), 0), ("RIGHTPADDING", (0, 0), (-1, -1), 0), ("BOTTOMPADDING", (0, 0), (-1, -1), 0), ])) story.append(hdr_tbl) else: # Text-only fallback — still branded with red dot accent story.append(Paragraph("PRICELYST.", ParagraphStyle( "WMFb", parent=styles["Title"], fontSize=28, textColor=NAVY, fontName="Helvetica-Bold", alignment=TA_CENTER))) story.append(Paragraph( "Zimbabwe\'s #1 Price Comparison", ParagraphStyle("TagFb", parent=styles["Normal"], fontSize=11, textColor=RED, alignment=TA_CENTER))) def _build_pdf_footer(story, styles) -> None: """Shared branded footer.""" from reportlab.lib import colors from reportlab.lib.styles import ParagraphStyle from reportlab.lib.units import cm from reportlab.platypus import Spacer, HRFlowable, Paragraph from reportlab.lib.enums import TA_CENTER story.append(Spacer(1, 0.4*cm)) story.append(HRFlowable(width="100%", thickness=2, color=colors.HexColor("#E63329"))) story.append(Spacer(1, 0.15*cm)) story.append(Paragraph( "Prices subject to change without notice. Data sourced live from participating retailers.", ParagraphStyle("Footer", parent=styles["Normal"], fontSize=7, textColor=colors.HexColor("#888888"), alignment=TA_CENTER) )) story.append(Spacer(1, 0.08*cm)) story.append(Paragraph( "For comprehensive basket comparisons and retailer Pre-Orders visit " "www.pricelyst.co.zw", ParagraphStyle("FooterCTA", parent=styles["Normal"], fontSize=8, textColor=colors.HexColor("#003087"), alignment=TA_CENTER, fontName="Helvetica-Bold") )) def generate_catalogue_pdf(title: str, items: List[Dict], scope_label: str = "Price Comparison") -> Optional[str]: """ Generate a price-comparison PDF. Layout update: - Brand column removed. - Savings is its own column. - Best offer is marked inside the Price column with a check mark. """ try: from reportlab.lib.pagesizes import A4 from reportlab.lib import colors from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import cm from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle) filename = f"pricelyst_catalogue_{uuid.uuid4().hex[:8]}.pdf" filepath = os.path.join(CATALOGUE_DIR, filename) doc = SimpleDocTemplate(filepath, pagesize=A4, rightMargin=1.4*cm, leftMargin=1.4*cm, topMargin=1.4*cm, bottomMargin=1.4*cm) styles = getSampleStyleSheet() NAVY = colors.HexColor("#003087") LIGHT = colors.HexColor("#EEF2FF") head_style = ParagraphStyle("ColH", parent=styles["Normal"], fontSize=9, textColor=colors.white, fontName="Helvetica-Bold") cell_style = ParagraphStyle("Cell", parent=styles["Normal"], fontSize=8, leading=10, textColor=colors.HexColor("#222222")) best_style = ParagraphStyle("Best", parent=styles["Normal"], fontSize=8, leading=10, textColor=NAVY, fontName="Helvetica-Bold") money_style = ParagraphStyle("Money", parent=styles["Normal"], fontSize=8, leading=10, textColor=colors.HexColor("#222222")) story = [] _build_pdf_header(story, styles, scope_label, title) story.append(Spacer(1, 0.2*cm)) # Product, Category, Store, Price, Savings col_widths = [6.4*cm, 3.0*cm, 3.3*cm, 2.4*cm, 2.3*cm] table_data = [[ Paragraph("Product", head_style), Paragraph("Category", head_style), Paragraph("Store", head_style), Paragraph("Price", head_style), Paragraph("Savings", head_style), ]] for item in items: offers = item.get("offers", []) best_p = float(item.get("best_price", 0) or 0) savings = float(item.get("potential_savings", 0) or 0) sub_note = " ⚠ nearest" if item.get("is_substitute") else "" savings_text = f"${savings:.2f}" if savings > 0.05 else "—" if not offers: table_data.append([ Paragraph(item.get("query", "Unknown") + sub_note, cell_style), Paragraph("—", cell_style), Paragraph("Not listed", cell_style), Paragraph("N/A", cell_style), Paragraph("—", cell_style), ]) continue for idx_o, offer in enumerate(offers): price = float(offer.get("price", 0) or 0) is_best = abs(price - best_p) < 0.001 price_label = f"✓ ${price:.2f}" if is_best else f"${price:.2f}" style = best_style if is_best else money_style table_data.append([ Paragraph((item.get("product_name", "") + sub_note) if idx_o == 0 else "", cell_style), Paragraph(item.get("category", "") if idx_o == 0 else "", cell_style), Paragraph(str(offer.get("retailer", "")), style), Paragraph(price_label, style), Paragraph(savings_text if idx_o == 0 else "", cell_style), ]) tbl = Table(table_data, colWidths=col_widths, repeatRows=1) tbl.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), NAVY), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, LIGHT]), ("GRID", (0, 0), (-1, -1), 0.35, colors.HexColor("#CCCCCC")), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 4), ("RIGHTPADDING", (0, 0), (-1, -1), 4), ("TOPPADDING", (0, 0), (-1, -1), 3), ("BOTTOMPADDING", (0, 0), (-1, -1), 3), ])) story.append(tbl) _build_pdf_footer(story, styles) doc.build(story) logger.info(f"Catalogue PDF generated: {filepath}") return filepath except Exception as e: logger.error(f"PDF generation failed: {e}", exc_info=True) return None def generate_rich_pdf(title: str, scope_label: str, body_markdown: str, items: Optional[List[Dict]] = None) -> Optional[str]: """ Generate a rich branded PDF for budget plans, meal plans, recipes. body_markdown: the Gemini-generated text response, converted to clean paragraphs. items: optional found_items list for an appended price table. """ try: from reportlab.lib.pagesizes import A4 from reportlab.lib import colors from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import cm from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable) from reportlab.lib.enums import TA_LEFT filename = f"pricelyst_rich_{uuid.uuid4().hex[:8]}.pdf" filepath = os.path.join(CATALOGUE_DIR, filename) doc = SimpleDocTemplate(filepath, pagesize=A4, rightMargin=1.5*cm, leftMargin=1.5*cm, topMargin=1.5*cm, bottomMargin=1.5*cm) styles = getSampleStyleSheet() NAVY = colors.HexColor("#003087") RED = colors.HexColor("#E63329") LIGHT = colors.HexColor("#EEF2FF") body_style = ParagraphStyle("Body", parent=styles["Normal"], fontSize=10, leading=14, spaceAfter=4, textColor=colors.HexColor("#222222")) h1_style = ParagraphStyle("H1", parent=styles["Heading2"], fontSize=13, textColor=NAVY, fontName="Helvetica-Bold", spaceAfter=6, spaceBefore=10) bullet_style = ParagraphStyle("Bullet", parent=styles["Normal"], fontSize=10, leading=14, leftIndent=12, spaceAfter=3, textColor=colors.HexColor("#333333")) bold_style = ParagraphStyle("Bold", parent=styles["Normal"], fontSize=10, leading=14, fontName="Helvetica-Bold", textColor=NAVY, spaceAfter=4) story = [] _build_pdf_header(story, styles, scope_label, title) # ── Parse body_markdown into PDF paragraphs ──────────────────────── # Strip WhatsApp formatting and convert to clean paragraphs lines = body_markdown.replace("*", "").replace("_", "").split("\n") for line in lines: stripped = line.strip() if not stripped: story.append(Spacer(1, 0.15*cm)) continue # Detect section headers (lines ending with colon or all caps short lines) if (stripped.endswith(":") and len(stripped) < 60) or (stripped.isupper() and len(stripped) < 50): story.append(Paragraph(stripped, h1_style)) elif stripped.startswith(("•", "-", "✅", "▪️", "🛒", "💰", "🔥", "⚡", "🍽️")): clean = stripped.lstrip("•-✅▪️🛒💰🔥⚡🍽️ ") story.append(Paragraph(f"• {clean}", bullet_style)) elif stripped.startswith(tuple("123456789")) and (". " in stripped or ". " in stripped): story.append(Paragraph(stripped, bullet_style)) elif any(kw in stripped.lower() for kw in ["total:", "grand total", "cost per", "budget:", "subtotal"]): story.append(Paragraph(stripped, bold_style)) else: story.append(Paragraph(stripped, body_style)) # ── Optional price table ─────────────────────────────────────────── if items: story.append(Spacer(1, 0.4*cm)) story.append(HRFlowable(width="100%", thickness=1, color=colors.HexColor("#CCCCCC"))) story.append(Spacer(1, 0.2*cm)) story.append(Paragraph("Shopping List — Price Comparison", ParagraphStyle("TblTitle", parent=styles["Heading3"], fontSize=11, textColor=NAVY, fontName="Helvetica-Bold"))) story.append(Spacer(1, 0.15*cm)) head_style = ParagraphStyle("ColH", parent=styles["Normal"], fontSize=9, textColor=colors.white, fontName="Helvetica-Bold") cell_style = ParagraphStyle("Cell", parent=styles["Normal"], fontSize=8, textColor=colors.HexColor("#222222")) best_style = ParagraphStyle("Best", parent=styles["Normal"], fontSize=8, textColor=NAVY, fontName="Helvetica-Bold") col_w = [5.5*cm, 3*cm, 3*cm, 2.5*cm, 3*cm] tdata = [[ Paragraph("Ingredient", head_style), Paragraph("Best Price", head_style), Paragraph("Store", head_style), Paragraph("Save", head_style), Paragraph("Alternatives", head_style), ]] for item in items: offers = item.get("offers", []) best_p = item.get("best_price", 0) savings = item.get("potential_savings", 0) alts = ", ".join( f"{o['retailer']} ${o['price']:.2f}" for o in offers[1:3] ) if len(offers) > 1 else "—" best_store = offers[0]["retailer"] if offers else "—" tdata.append([ Paragraph(item.get("product_name", item.get("query", "")), cell_style), Paragraph(f"${best_p:.2f}", best_style), Paragraph(best_store, cell_style), Paragraph(f"${savings:.2f}" if savings > 0.05 else "—", cell_style), Paragraph(alts, cell_style), ]) tbl = Table(tdata, colWidths=col_w, repeatRows=1) tbl.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), NAVY), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, LIGHT]), ("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#CCCCCC")), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 4), ("RIGHTPADDING", (0, 0), (-1, -1), 4), ("TOPPADDING", (0, 0), (-1, -1), 3), ("BOTTOMPADDING", (0, 0), (-1, -1), 3), ])) story.append(tbl) _build_pdf_footer(story, styles) doc.build(story) logger.info(f"Rich PDF generated: {filepath}") return filepath except Exception as e: logger.error(f"Rich PDF generation failed: {e}", exc_info=True) return None # ───────────────────────────────────────────── # 11. Firebase Profile Helpers # ───────────────────────────────────────────── def get_or_create_profile(mobile: str) -> Dict[str, Any]: if not db: return {} try: ref = db.collection("pricelyst_profiles").document(mobile) doc = ref.get() if doc.exists: return doc.to_dict() profile = {"mobile": mobile, "created_at": datetime.now(timezone.utc).isoformat()} ref.set(profile) return profile except Exception as e: logger.error(f"Profile fetch error for {mobile}: {e}") return {} def get_chat_history(mobile: str, limit: int = 6) -> str: if not db: return "" try: docs = ( db.collection("pricelyst_profiles").document(mobile) .collection("chat_logs") .order_by("ts", direction=firestore.Query.DESCENDING) .limit(limit) .stream() ) msgs = [] for d in docs: data = d.to_dict() msgs.append(f"User: {data.get('message', '')}\nApril: {data.get('response', '')}") return "\n".join(reversed(msgs)) except Exception as e: logger.error(f"Chat history error: {e}") return "" def save_chat_log(mobile: str, message: str, response: str, intent: Dict) -> None: if not db: return try: db.collection("pricelyst_profiles").document(mobile).collection("chat_logs").add({ "message": message, "response": response, "intent": intent, "ts": datetime.now(timezone.utc).isoformat(), }) except Exception as e: logger.error(f"Chat log save error: {e}") def save_shopping_plan(mobile: str, plan: Dict) -> Optional[str]: if not db: return None try: ref = (db.collection("pricelyst_profiles").document(mobile) .collection("shopping_plans").document()) plan["id"] = ref.id ref.set(plan) return ref.id except Exception as e: logger.error(f"Plan save error: {e}") return None def save_feedback(mobile: str, feedback_text: str, feedback_type: str = "general") -> bool: """ Save user feedback to a top-level Firestore collection for easy review. feedback_type: "product_request" | "price_issue" | "suggestion" | "general" """ if not db: return False try: db.collection("pricelyst_feedback").add({ "mobile": mobile, "feedback": feedback_text, "type": feedback_type, "ts": datetime.now(timezone.utc).isoformat(), "status": "new", # new | reviewed | actioned }) # Also log against the user's profile for context db.collection("pricelyst_profiles").document(mobile).collection("feedback").add({ "feedback": feedback_text, "type": feedback_type, "ts": datetime.now(timezone.utc).isoformat(), }) logger.info(f"Feedback saved from {mobile}: [{feedback_type}] {feedback_text[:80]}") return True except Exception as e: logger.error(f"Feedback save error: {e}") return False # ───────────────────────────────────────────── # 12. Firebase Storage & Media Helpers # ───────────────────────────────────────────── def upload_to_firebase_storage(file_path: str, folder: str = "catalogues") -> Optional[str]: if not FIREBASE_STORAGE_BUCKET: return None try: bucket = fb_storage.bucket() blob = bucket.blob(f"{folder}/{os.path.basename(file_path)}") blob.upload_from_filename(file_path) url = blob.generate_signed_url(expiration=timedelta(hours=1)) return url except Exception as e: logger.error(f"Firebase Storage upload failed: {e}") return None def upload_to_imgur(file_path: str) -> Optional[str]: if not IMGUR_CLIENT_ID: return None try: with open(file_path, "rb") as f: resp = requests.post(IMGUR_URL, headers=IMGUR_HEADERS, files={"image": f}) resp.raise_for_status() data = resp.json() return data["data"]["link"] if data.get("success") else None except Exception as e: logger.error(f"Imgur upload failed: {e}") return None def deepgram_tts(text: str) -> Optional[str]: if not DEEPGRAM_API_KEY: return None try: resp = requests.post( DEEPGRAM_TTS_URL, headers={"Authorization": f"Token {DEEPGRAM_API_KEY}", "Content-Type": "application/json"}, json={"text": text}, timeout=30, ) resp.raise_for_status() fp = os.path.join(os.getcwd(), f"tts_{uuid.uuid4().hex}.mp3") with open(fp, "wb") as f: f.write(resp.content) return fp except Exception as e: logger.error(f"DeepGram TTS failed: {e}") return None # ───────────────────────────────────────────── # 13. Deals & Discovery Helpers # ───────────────────────────────────────────── def get_todays_deals(limit: int = 8) -> List[Dict]: df = get_market_index() if df.empty: return [] try: offers = df[df["is_offer"] == True].copy() if offers.empty: return [] price_range = offers.groupby("product_name")["price"].agg(["min", "max"]).reset_index() price_range["savings"] = price_range["max"] - price_range["min"] top = (price_range[price_range["savings"] > 0.05] .sort_values("savings", ascending=False).head(limit)) deals = [] for _, row in top.iterrows(): cheapest = offers[offers["product_name"] == row["product_name"]].sort_values("price").iloc[0] deals.append({ "product_name": row["product_name"], "cheapest_price": float(cheapest["price"]), "retailer": cheapest["retailer"], "savings": float(row["savings"]), "category": cheapest.get("category", ""), }) return deals except Exception as e: logger.error(f"Deals fetch error: {e}") return [] def get_category_list() -> List[str]: df = get_market_index() if df.empty: return [] try: return sorted(df["category"].dropna().unique().tolist()) except Exception: return [] def format_deals_message(deals: List[Dict]) -> str: if not deals: return "No deals data right now. Please try again shortly." lines = ["🏷️ *Today\'s Best Deals* 🇿🇼\n"] for i, d in enumerate(deals, 1): lines.append(f"*{i}. {d['product_name']}*") lines.append(f" 💰 ${d['cheapest_price']:.2f} @ {d['retailer']}") lines.append(f" 🔥 Save up to ${d['savings']:.2f}") lines.append("") lines.append("_Type any product name to compare prices!_") lines.append("") lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*") lines.append("www.pricelyst.co.zw") return "\n".join(lines)