Spaces:
Running
Running
| """ | |
| utility.py — Pricelyst WhatsApp Bot (v2) | |
| Core AI & Data layer. | |
| Upgrades in this version: | |
| 1. Smart message formatting — per-store breakdown for ≤3 items, | |
| cheapest-basket summary with highlights for larger baskets. | |
| 2. Budget Engine — real $ calculations against live catalogue prices. | |
| Handles: fixed budget shopping, monthly household budgeting, | |
| party/event planning with headcount, meal prep with recipes, | |
| image-to-recipe-to-shopping-list pipeline. | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| import math | |
| import uuid | |
| import logging | |
| import base64 | |
| import io | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| # ───────────────────────────────────────────── | |
| # 1. Gemini (new google-genai SDK) | |
| # ───────────────────────────────────────────── | |
| try: | |
| from google import genai | |
| from google.genai import types as genai_types | |
| except ImportError: | |
| genai = None | |
| genai_types = None | |
| logger.error("google-genai not installed. Run: pip install google-genai") | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite") | |
| _gemini_client = None | |
| if genai and GOOGLE_API_KEY: | |
| try: | |
| _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) | |
| except Exception as e: | |
| logger.error("Failed to init Gemini client: %s", e) | |
| # ───────────────────────────────────────────── | |
| # 2. Firebase | |
| # ───────────────────────────────────────────── | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore, storage as fb_storage | |
| FIREBASE_ENV = os.environ.get("FIREBASE", "") | |
| FIREBASE_STORAGE_BUCKET = os.environ.get("FIREBASE_STORAGE_BUCKET", "") | |
| db: Optional[Any] = None | |
| def _get_firestore_client() -> Optional[Any]: | |
| db_name = os.environ.get("FIRESTORE_DB_NAME", "(default)") | |
| return firestore.client(database_id=db_name) | |
| def init_firestore_from_env() -> Optional[Any]: | |
| global db | |
| try: | |
| if firebase_admin._apps: | |
| db = _get_firestore_client() | |
| return db | |
| if not FIREBASE_ENV: | |
| logger.warning("FIREBASE env var missing. Persistence disabled.") | |
| return None | |
| sa_info = json.loads(FIREBASE_ENV) | |
| cred = credentials.Certificate(sa_info) | |
| init_opts = {} | |
| if FIREBASE_STORAGE_BUCKET: | |
| init_opts["storageBucket"] = FIREBASE_STORAGE_BUCKET | |
| firebase_admin.initialize_app(cred, init_opts) | |
| db = _get_firestore_client() | |
| logger.info("Firebase initialized.") | |
| return db | |
| except Exception as e: | |
| logger.critical("Failed to initialize Firebase: %s", e, exc_info=True) | |
| return None | |
| db = init_firestore_from_env() | |
| # ───────────────────────────────────────────── | |
| # 3. Static Config | |
| # ───────────────────────────────────────────── | |
| PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/") | |
| HTTP_TIMEOUT = 30 | |
| PRODUCT_CACHE_TTL = 60 * 20 # 20 minutes | |
| # ZESA/ZETDC tariffs — ZERA approved, USD equivalent incl. 6% REA levy | |
| # Source: zimpricecheck.com, last updated 20 May 2026 | |
| # Billing is in ZiG; USD equivalents at prevailing interbank rate. | |
| # Domestic users billed in ZiG — USD estimates for user convenience. | |
| ZIM_CONTEXT = { | |
| "zesa_bands": [ | |
| {"limit": 50, "rate_usd": 0.08}, # first 50 units | |
| {"limit": 100, "rate_usd": 0.09}, # 51-100 | |
| {"limit": 200, "rate_usd": 0.16}, # 101-200 | |
| {"limit": 300, "rate_usd": 0.23}, # 201-300 | |
| {"limit": 400, "rate_usd": 0.25}, # 301-400 | |
| {"limit": 9999, "rate_usd": 0.26}, # 401+ | |
| ], | |
| "zesa_note": "Tariffs billed in ZiG; USD estimates at prevailing rate. Includes 6% REA levy.", | |
| } | |
| IMGUR_CLIENT_ID = os.environ.get("IMGUR_CLIENT_ID", "") | |
| IMGUR_URL = "https://api.imgur.com/3/image" | |
| IMGUR_HEADERS = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {} | |
| DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "") | |
| DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" | |
| CATALOGUE_DIR = os.path.join(os.getcwd(), "catalogues") | |
| os.makedirs(CATALOGUE_DIR, exist_ok=True) | |
| # ── Logo cache ───────────────────────────────────────────────────────────── | |
| # Downloaded once at startup and reused for all PDFs. | |
| # Uses requests (not urllib) so it goes through the same session that works | |
| # on HuggingFace — urllib hits the same egress block as Meta/Cloudflare. | |
| LOGO_URL = os.environ.get("PRICELYST_LOGO_URL", "https://i.imgur.com/4bVNlBs.jpeg") | |
| _logo_path: Optional[str] = None # path to cached local file | |
| def _get_logo_path() -> Optional[str]: | |
| """ | |
| Return a ReportLab-safe cached logo path. | |
| Diagnosis/fix: | |
| - Imgur can sometimes return a redirect/html error page or a JPEG variant | |
| that ReportLab fails to decode cleanly. | |
| - We now download with a browser-like User-Agent, verify the content is an | |
| image, and convert it to PNG using Pillow when available. PNG is the | |
| safest format for ReportLab. | |
| - If Pillow is not installed, we still cache the original image bytes and | |
| ReportLab will try to render them. | |
| """ | |
| global _logo_path | |
| if _logo_path and os.path.exists(_logo_path): | |
| return _logo_path | |
| try: | |
| headers = { | |
| "User-Agent": "PricelystBot/1.0 (+https://pricelyst.co.zw)", | |
| "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", | |
| } | |
| resp = requests.get(LOGO_URL, timeout=20, headers=headers, allow_redirects=True) | |
| resp.raise_for_status() | |
| content_type = (resp.headers.get("Content-Type") or "").lower() | |
| if "image" not in content_type and not resp.content[:16].startswith((b"\xff\xd8", b"\x89PNG", b"GIF", b"RIFF")): | |
| logger.warning( | |
| "Logo download did not return an image. content_type=%s bytes=%s", | |
| content_type, len(resp.content) | |
| ) | |
| return None | |
| png_file = os.path.join(CATALOGUE_DIR, "pricelyst_logo.png") | |
| try: | |
| from PIL import Image | |
| img = Image.open(io.BytesIO(resp.content)) | |
| if img.mode not in ("RGB", "RGBA"): | |
| img = img.convert("RGBA") | |
| img.save(png_file, format="PNG", optimize=True) | |
| _logo_path = png_file | |
| logger.info("Logo cached as PNG at %s (%s bytes)", png_file, os.path.getsize(png_file)) | |
| return _logo_path | |
| except Exception as pil_err: | |
| # Fallback for environments without Pillow. ReportLab may still render it. | |
| ext = ".png" if "png" in content_type else ".jpg" | |
| raw_file = os.path.join(CATALOGUE_DIR, f"pricelyst_logo_raw{ext}") | |
| with open(raw_file, "wb") as f: | |
| f.write(resp.content) | |
| _logo_path = raw_file | |
| logger.warning("Pillow logo conversion failed (%s). Cached raw logo at %s", pil_err, raw_file) | |
| return _logo_path | |
| except Exception as e: | |
| logger.warning("Logo download failed from %s: %s", LOGO_URL, e) | |
| return None | |
| # ───────────────────────────────────────────── | |
| # 4. Market Index | |
| # ───────────────────────────────────────────── | |
| # 4. Market Index (ETL) | |
| # ───────────────────────────────────────────── | |
| _data_cache: Dict[str, Any] = {"ts": 0, "df": pd.DataFrame(), "raw_count": 0} | |
| def _norm(s: Any) -> str: | |
| return str(s).strip().lower() if s else "" | |
| def _coerce_price(v: Any) -> float: | |
| try: | |
| return float(v) if v is not None else 0.0 | |
| except Exception: | |
| return 0.0 | |
| # Alcohol/age-restricted terms are not priced in WhatsApp chat. | |
| # We keep the rest of the user's basket alive and point restricted items to the site. | |
| ALCOHOL_TERMS = { | |
| "alcohol", "beer", "lager", "stout", "wine", "whisky", "whiskey", | |
| "vodka", "gin", "rum", "brandy", "cider", "spirit", "spirits", | |
| "booze", "liquor", "champagne", "tequila", "amarula", "castle", | |
| "zambezi", "lion lager", "carling", "black label", "hunters", | |
| "savanna", "heineken", "stella", "chibuku", "scud", "super", | |
| } | |
| def contains_alcohol_reference(text: Any) -> bool: | |
| t = f" {_norm(text)} " | |
| return any(f" {term} " in t or term in t for term in ALCOHOL_TERMS) | |
| def split_restricted_items(item_names: List[str]) -> Tuple[List[str], List[str]]: | |
| allowed, restricted = [], [] | |
| for item in item_names or []: | |
| if contains_alcohol_reference(item): | |
| restricted.append(str(item)) | |
| else: | |
| allowed.append(str(item)) | |
| return allowed, restricted | |
| def format_restricted_items_note(restricted_items: List[str]) -> str: | |
| if not restricted_items: | |
| return "" | |
| return ( | |
| "🔞 Restricted item(s): " + ", ".join(restricted_items) + | |
| " — please visit https://pricelyst.co.zw for more information." | |
| ) | |
| def _fetch_page(url: str, page: int, per_page: int) -> Tuple[List[Dict], int]: | |
| """ | |
| Fetch a single page from the product listing API. | |
| Returns (data_list, total_pages). | |
| Raises on HTTP error so the caller can decide retry logic. | |
| """ | |
| import urllib3 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| r = requests.get(url, params={"page": page, "perPage": per_page}, | |
| timeout=HTTP_TIMEOUT, verify=False) | |
| r.raise_for_status() | |
| payload = r.json() | |
| data = payload.get("data") or [] | |
| # API may return totalPages, last_page, or meta.last_page — check all | |
| total_pages = ( | |
| payload.get("totalPages") | |
| or payload.get("last_page") | |
| or (payload.get("meta") or {}).get("last_page") | |
| or 999 | |
| ) | |
| return data, int(total_pages) | |
| def fetch_and_flatten_data() -> pd.DataFrame: | |
| """ | |
| Full catalogue fetch — walks EVERY page, retries transient failures, | |
| never stops early on a single bad page. | |
| Strategy: | |
| - Fetch page 1 to discover total_pages | |
| - Then fetch pages 2..N concurrently (ThreadPoolExecutor) | |
| - Individual page failures are retried once, then logged and skipped | |
| so one bad page never kills the entire index | |
| - perPage=100 halves the number of round-trips vs perPage=50 | |
| """ | |
| import urllib3, concurrent.futures | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| url = f"{PRICE_API_BASE}/api/v1/product-listing" | |
| per_page = 100 | |
| all_raw: List[Dict] = [] | |
| # ── Page 1: discover total ─────────────────────────────────────────── | |
| try: | |
| page1_data, total_pages = _fetch_page(url, 1, per_page) | |
| all_raw.extend(page1_data) | |
| logger.info(f"ETL: Page 1/{total_pages} fetched ({len(page1_data)} products)") | |
| except Exception as e: | |
| logger.error(f"ETL: Page 1 failed — cannot continue: {e}") | |
| return pd.DataFrame() | |
| if total_pages <= 1: | |
| logger.info("ETL: Single-page catalogue.") | |
| else: | |
| # ── Pages 2..N: concurrent fetch ──────────────────────────────── | |
| remaining_pages = list(range(2, total_pages + 1)) | |
| def fetch_with_retry(pg: int) -> List[Dict]: | |
| for attempt in range(2): # one retry | |
| try: | |
| data, _ = _fetch_page(url, pg, per_page) | |
| logger.info(f"ETL: Page {pg}/{total_pages} — {len(data)} products") | |
| return data | |
| except Exception as e: | |
| if attempt == 0: | |
| logger.warning(f"ETL: Page {pg} attempt 1 failed ({e}), retrying...") | |
| time.sleep(0.5) | |
| else: | |
| logger.error(f"ETL: Page {pg} skipped after retry: {e}") | |
| return [] | |
| # Max 8 workers — enough concurrency without hammering the API | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: | |
| futures = {executor.submit(fetch_with_retry, pg): pg | |
| for pg in remaining_pages} | |
| for future in concurrent.futures.as_completed(futures): | |
| all_raw.extend(future.result()) | |
| logger.info(f"ETL: Total raw products fetched: {len(all_raw)}") | |
| # ── Flatten to rows ────────────────────────────────────────────────── | |
| rows = [] | |
| seen_ids: set = set() | |
| for p in all_raw: | |
| try: | |
| p_id = int(p.get("id") or 0) | |
| if p_id in seen_ids: | |
| continue | |
| seen_ids.add(p_id) | |
| p_name = str(p.get("name") or "Unknown").strip() | |
| brand_obj = p.get("brand") or {} | |
| brand_name = str(brand_obj.get("brand_name") or "").strip() | |
| cats = p.get("categories") or [] | |
| cat_names = [str(c.get("name") or "").strip() for c in cats if c.get("name")] | |
| primary_cat = cat_names[0] if cat_names else "General" | |
| # Rich search vector: name + brand + all categories + individual tokens | |
| # This makes "Tanganda Tips Tea Bags 100s Pouch" findable by: | |
| # "tanganda", "tips", "tea bags", "tea", "coffee", "milk" (category) | |
| vector_parts = [p_name, brand_name] + cat_names | |
| # Also add individual meaningful words from the product name | |
| name_tokens = [w for w in p_name.lower().split() | |
| if len(w) > 2 and w not in {"and", "the", "for", "with"}] | |
| vector_parts.extend(name_tokens) | |
| search_vector = _norm(" ".join(vector_parts)) | |
| views = int(p.get("view_count") or 0) | |
| image = str(p.get("thumbnail") or p.get("image") or "") | |
| prices = p.get("prices") or [] | |
| if not prices: | |
| rows.append({ | |
| "product_id": p_id, "product_name": p_name, | |
| "search_vector": search_vector, "brand": brand_name, | |
| "category": primary_cat, "retailer": "Listing", | |
| "price": 0.0, "views": views, "image": image, "is_offer": False, | |
| }) | |
| continue | |
| for offer in prices: | |
| retailer_obj = offer.get("retailer") or {} | |
| r_name = str(retailer_obj.get("name") or "Unknown Store").strip() | |
| price_val = _coerce_price(offer.get("price")) | |
| if price_val > 0: | |
| rows.append({ | |
| "product_id": p_id, "product_name": p_name, | |
| "search_vector": search_vector, "brand": brand_name, | |
| "category": primary_cat, "retailer": r_name, | |
| "price": price_val, "views": views, "image": image, "is_offer": True, | |
| }) | |
| except Exception as row_err: | |
| logger.warning(f"ETL: Skipped product {p.get('id','?')}: {row_err}") | |
| continue | |
| df = pd.DataFrame(rows) | |
| if not df.empty: | |
| # Build a secondary lookup index: brand -> product_ids for fast brand search | |
| logger.info( | |
| f"ETL: Flattened into {len(df)} rows | " | |
| f"{df['product_id'].nunique()} unique products | " | |
| f"{df['brand'].nunique()} brands | " | |
| f"{df['category'].nunique()} categories" | |
| ) | |
| return df | |
| def get_market_index(force_refresh: bool = False) -> pd.DataFrame: | |
| global _data_cache | |
| if (force_refresh or _data_cache["df"].empty | |
| or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL)): | |
| logger.info("ETL: Refreshing Market Index...") | |
| df = fetch_and_flatten_data() | |
| _data_cache.update({"df": df, "ts": time.time(), "raw_count": len(df)}) | |
| return _data_cache["df"] | |
| # ───────────────────────────────────────────── | |
| # 5. Precision Search & Basket Optimisation | |
| # ───────────────────────────────────────────── | |
| # Store alias map — common shorthand -> canonical retailer name substring | |
| STORE_ALIASES: Dict[str, str] = { | |
| "pnp": "pick n pay", | |
| "p&p": "pick n pay", | |
| "pick n pay": "pick n pay", | |
| "picknpay": "pick n pay", | |
| "tmpnpay": "pick n pay", | |
| "tm": "pick n pay", | |
| "ok": "ok", | |
| "ok mart": "ok", | |
| "okmart": "ok", | |
| "spar": "spar", | |
| "food lovers": "food lover", | |
| "food lover": "food lover", | |
| "foodlovers": "food lover", | |
| "fl": "food lover", | |
| "choppies": "choppies", | |
| "bon marche": "bon marche", | |
| "bon": "bon marche", | |
| "checkers": "checkers", | |
| "game": "game", | |
| } | |
| # Generic category words — trigger category-aware search rather than literal match | |
| GENERIC_QUERIES = { | |
| "cooking oil", "oil", "maize meal", "meal", "bread", "milk", "sugar", | |
| "rice", "flour", "salt", "eggs", "coffee", "tea", "juice", "water", | |
| "soap", "washing powder", "detergent", "chicken", "beef", "fish", | |
| "butter", "margarine", "cheese", "yoghurt", "cereal", "nappies", | |
| "diapers", "toilet paper", "tissue", | |
| } | |
| def normalise_store_query(store_name: str) -> Optional[str]: | |
| """Return canonical store substring for df filtering, or None.""" | |
| return STORE_ALIASES.get(_norm(store_name)) | |
| def search_products_deep(df: pd.DataFrame, query: str, limit: int = 50) -> pd.DataFrame: | |
| """ | |
| Balanced multi-signal search. | |
| Designed to fix: "cooking oil" -> "Olive Pride Extra Virgin Olive Oil" | |
| instead of "Zimgold Cooking Oil 2L". | |
| Signal hierarchy: | |
| 1000 - exact product name match | |
| 600 - ALL query tokens present in vector (strict full coverage) | |
| 500 - query string is a substring of the product name | |
| 200 - brand name appears in the query | |
| +80ea - per-token overlap (partial match) | |
| 300 - generic category bonus: product name STARTS with the category | |
| -150 - anti-inflation penalty: generic query matched deep inside a | |
| long fancy product name (e.g. "oil" in "Extra Virgin Olive Oil") | |
| """ | |
| if df.empty or not query: | |
| return df | |
| q_norm = _norm(query) | |
| q_tokens = [t for t in q_norm.split() if len(t) > 1] | |
| q_set = set(q_tokens) | |
| is_generic = (q_norm in GENERIC_QUERIES | |
| or (bool(q_tokens) and all(t in GENERIC_QUERIES for t in q_tokens))) | |
| def score(row) -> int: | |
| s = 0 | |
| p_norm = _norm(row["product_name"]) | |
| vector = row["search_vector"] | |
| v_set = set(vector.split()) | |
| b_norm = _norm(row.get("brand", "")) | |
| # 1. Exact full name match | |
| if q_norm == p_norm: | |
| return 2000 | |
| # 2. All query tokens present in vector (strict full coverage) | |
| # e.g. ["tanganda", "tips"] both in vector → strong signal | |
| if q_set and q_set.issubset(v_set): | |
| s += 800 | |
| # 3. Query is a sequential substring of the product name | |
| # "tanganda tips" in "tanganda tips tea bags 100s pouch" → very strong | |
| if q_norm in p_norm: | |
| s += 700 | |
| # 4. Product name starts with query (tightest positional match) | |
| # "zimgold cooking oil" → "zimgold cooking oil 2l" starts with it | |
| if p_norm.startswith(q_norm): | |
| s += 400 | |
| # 5. Brand token is in query AND brand is in product name | |
| # "tanganda" in query AND brand == "Tanganda" → brand fidelity | |
| if b_norm and b_norm in q_norm and b_norm in p_norm: | |
| s += 350 | |
| # 6. Brand appears in query (even without product name match) | |
| elif b_norm and b_norm in q_norm: | |
| s += 150 | |
| # 7. Per-token overlap score (partial matching) | |
| overlap = len(q_set.intersection(v_set)) | |
| s += overlap * 90 | |
| # 8. Generic category bonus: product name starts with the generic term | |
| # "Cooking Oil 2L" starts with "cooking oil" → beats fancy oil names | |
| if is_generic and p_norm.startswith(q_norm): | |
| s += 300 | |
| # 9. Anti-inflation: weak generic match buried in long fancy name | |
| # e.g. query "oil" matching "Extra Virgin Cold-Pressed Avocado Oil" | |
| if s < 150 and is_generic and len(p_norm.split()) > 4: | |
| s = max(0, s - 200) | |
| return s | |
| df_scored = df.copy() | |
| df_scored["match_score"] = df_scored.apply(score, axis=1) | |
| matches = df_scored[df_scored["match_score"] > 0] | |
| if matches.empty: | |
| return matches | |
| return matches.sort_values( | |
| ["match_score", "views", "price"], ascending=[False, False, True] | |
| ).head(limit) | |
| def calculate_basket_optimization(item_names: List[str], | |
| preferred_retailer: Optional[str] = None) -> Dict[str, Any]: | |
| # Keep age-restricted/alcohol mentions out of WhatsApp pricing while still | |
| # processing the rest of the basket. Example: milk + Zambezi + sugar → | |
| # price milk/sugar, point Zambezi to the site. | |
| clean_items, restricted_items = split_restricted_items(item_names or []) | |
| df = get_market_index() | |
| if df.empty: | |
| return {"actionable": False, "error": "Market data unavailable. Please try again shortly."} | |
| if restricted_items and not clean_items: | |
| return { | |
| "actionable": True, | |
| "found_items": [], | |
| "global_missing": [], | |
| "restricted_items": restricted_items, | |
| "market_matrix": [], | |
| "best_store": None, | |
| "preferred_retailer": preferred_retailer, | |
| } | |
| item_names = clean_items | |
| # Normalise store alias (e.g. "TmPnPay" -> "pick n pay") | |
| store_filter: Optional[str] = None | |
| if preferred_retailer: | |
| store_filter = normalise_store_query(preferred_retailer) | |
| if not store_filter: | |
| store_filter = _norm(preferred_retailer) # use raw normalised if not in alias map | |
| found_items = [] | |
| missing_global = [] | |
| for item in item_names: | |
| hits = search_products_deep(df[df["is_offer"] == True], item, limit=50) | |
| if hits.empty: | |
| missing_global.append(item) | |
| continue | |
| best_match = hits.iloc[0] | |
| q_norm = _norm(item) | |
| res_norm = _norm(f"{best_match['product_name']} {best_match['brand']}") | |
| q_tokens = q_norm.split() | |
| is_sub = (len(q_tokens) > 1 | |
| and sum(1 for t in q_tokens if t in res_norm) < len(q_tokens)) | |
| product_offers = (hits[hits["product_name"] == best_match["product_name"]] | |
| .sort_values("price")) | |
| offers_list = [{"retailer": r["retailer"], "price": float(r["price"])} | |
| for _, r in product_offers.iterrows()] | |
| best_price = offers_list[0]["price"] | |
| max_price = offers_list[-1]["price"] | |
| found_items.append({ | |
| "query": item, | |
| "product_name": str(best_match["product_name"]), | |
| "brand": str(best_match["brand"]), | |
| "category": str(best_match["category"]), | |
| "image": str(best_match["image"]), | |
| "is_substitute": is_sub, | |
| "offers": offers_list, | |
| "best_price": best_price, | |
| "potential_savings": max_price - best_price, | |
| }) | |
| if not found_items: | |
| return { | |
| "actionable": True, | |
| "found_items": [], | |
| "global_missing": missing_global, | |
| "restricted_items": restricted_items, | |
| } | |
| # Market Matrix — if a store filter is set, show that store first in comparison | |
| all_retailers = set(o["retailer"] for f in found_items for o in f["offers"]) | |
| store_comparison = [] | |
| for retailer in all_retailers: | |
| total_price = 0.0 | |
| found_count = 0 | |
| missing_list = [] | |
| for item in found_items: | |
| price = next((o["price"] for o in item["offers"] if o["retailer"] == retailer), None) | |
| if price: | |
| total_price += price | |
| found_count += 1 | |
| else: | |
| missing_list.append(item["product_name"]) | |
| store_comparison.append({ | |
| "retailer": retailer, | |
| "total_price": total_price, | |
| "found_count": found_count, | |
| "total_items": len(found_items), | |
| "missing_items": missing_list, | |
| }) | |
| store_comparison.sort(key=lambda x: (-x["found_count"], x["total_price"])) | |
| if len(store_comparison) > 1: | |
| max_total = max(s["total_price"] for s in store_comparison | |
| if s["found_count"] == store_comparison[0]["found_count"]) | |
| for s in store_comparison: | |
| s["basket_savings"] = ( | |
| max_total - s["total_price"] | |
| if s["found_count"] == store_comparison[0]["found_count"] else 0.0 | |
| ) | |
| else: | |
| for s in store_comparison: | |
| s["basket_savings"] = 0.0 | |
| # If a specific store was requested, pin it and its comparison to the front | |
| if store_filter: | |
| pinned = [s for s in store_comparison if store_filter in _norm(s["retailer"])] | |
| rest = [s for s in store_comparison if store_filter not in _norm(s["retailer"])] | |
| store_comparison = pinned + rest | |
| return { | |
| "actionable": True, | |
| "is_basket": len(found_items) > 1, | |
| "found_items": found_items, | |
| "global_missing": missing_global, | |
| "restricted_items": restricted_items, | |
| "market_matrix": store_comparison[:5], | |
| "best_store": store_comparison[0] if store_comparison else None, | |
| "preferred_retailer": preferred_retailer, | |
| "store_filter": store_filter, | |
| } | |
| # ───────────────────────────────────────────── | |
| # 6. ZESA Calculator | |
| # ───────────────────────────────────────────── | |
| def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]: | |
| """ | |
| Calculate ZESA units for a USD amount using the actual 2026 ZETDC | |
| stepped tariff bands (ZiG-billed, USD equivalent incl. 6% REA levy). | |
| Source: zimpricecheck.com / ZERA approved tariffs, May 2026. | |
| Band structure (USD per unit, incl. 6% Rural Electrification Levy): | |
| 0-50 units: $0.08/unit | |
| 51-100: $0.09/unit | |
| 101-200: $0.16/unit | |
| 201-300: $0.23/unit | |
| 301-400: $0.25/unit | |
| 401+: $0.26/unit | |
| """ | |
| bands = ZIM_CONTEXT["zesa_bands"] | |
| remaining = float(amount_usd) | |
| units = 0.0 | |
| prev_limit = 0 | |
| band_breakdown = [] | |
| for band in bands: | |
| band_size = band["limit"] - prev_limit | |
| band_cost = band_size * band["rate_usd"] | |
| if remaining <= 0: | |
| break | |
| if remaining >= band_cost: | |
| units += band_size | |
| remaining -= band_cost | |
| band_breakdown.append({ | |
| "band": f"{prev_limit+1}-{band['limit']}", | |
| "units": band_size, | |
| "rate": band["rate_usd"], | |
| }) | |
| else: | |
| partial = remaining / band["rate_usd"] | |
| units += partial | |
| band_breakdown.append({ | |
| "band": f"{prev_limit+1}-{band['limit']}", | |
| "units": round(partial, 1), | |
| "rate": band["rate_usd"], | |
| }) | |
| remaining = 0 | |
| prev_limit = band["limit"] | |
| return { | |
| "amount_usd": float(amount_usd), | |
| "est_units_kwh": float(round(units, 1)), | |
| "band_breakdown": band_breakdown, | |
| "note": ZIM_CONTEXT["zesa_note"], | |
| } | |
| # ───────────────────────────────────────────── | |
| # 7. Smart WhatsApp Formatter (Upgrade 1) | |
| # ───────────────────────────────────────────── | |
| def format_basket_for_whatsapp(analyst: Dict, language: str = "English") -> str: | |
| """ | |
| Smart formatter: | |
| - ≤3 items → full per-store price breakdown for each item | |
| - >3 items → cheapest basket total + store comparison + highlights | |
| Spaced for mobile readability. Concise but complete. | |
| """ | |
| if not analyst.get("actionable"): | |
| return analyst.get("error", "Sorry, I couldn't fetch price data right now.") | |
| found = analyst.get("found_items", []) | |
| missing = analyst.get("global_missing", []) | |
| restricted = analyst.get("restricted_items", []) | |
| matrix = analyst.get("market_matrix", []) | |
| lines = [] | |
| if not found: | |
| note = format_restricted_items_note(restricted) | |
| if missing: | |
| base = (f"⚠️ Couldn't find: {', '.join(missing)}\n" | |
| f"_Try a shorter product name._") | |
| return base + ("\n\n" + note if note else "") | |
| if note: | |
| return note | |
| return "No results found. Try a different search term." | |
| n = len(found) | |
| # ── ≤ 3 items: full per-store breakdown ────────────────────────────── | |
| if n <= 3: | |
| for item in found: | |
| sub = " _(nearest match)_" if item.get("is_substitute") else "" | |
| lines.append(f"🏷️ *{item['product_name']}*{sub}") | |
| for o in item["offers"][:4]: | |
| is_best = o["price"] == item["best_price"] | |
| tick = "✅" if is_best else "▪️" | |
| lines.append(f" {tick} {o['retailer']}: *${o['price']:.2f}*") | |
| savings = item.get("potential_savings", 0) | |
| if savings > 0.10: | |
| lines.append(f" 💡 Save *${savings:.2f}* at cheapest store") | |
| lines.append("") | |
| if n > 1 and matrix: | |
| best = matrix[0] | |
| lines.append(f"🏪 Best basket: *{best['retailer']}* — *${best['total_price']:.2f}*") | |
| sv = best.get("basket_savings", 0) | |
| if sv > 0.10: | |
| lines.append(f"💰 Saves *${sv:.2f}* vs most expensive option") | |
| # ── > 3 items: basket summary + highlights ──────────────────────────── | |
| else: | |
| lines.append(f"🛒 *{n}-item basket*\n") | |
| if matrix: | |
| for s in matrix[:4]: | |
| cover = f"{s['found_count']}/{s['total_items']} items" | |
| marker = "✅" if s == matrix[0] else "▪️" | |
| lines.append(f" {marker} *{s['retailer']}*: *${s['total_price']:.2f}* ({cover})") | |
| lines.append("") | |
| sv = matrix[0].get("basket_savings", 0) | |
| if sv > 0.10: | |
| lines.append(f"💰 *{matrix[0]['retailer']}* saves you *${sv:.2f}* on this basket") | |
| lines.append("") | |
| big = sorted(found, key=lambda x: x.get("potential_savings", 0), reverse=True) | |
| hot = [x for x in big if x.get("potential_savings", 0) > 0.20][:3] | |
| if hot: | |
| lines.append("🔥 *Biggest savings:*") | |
| for item in hot: | |
| lines.append( | |
| f" • *{item['product_name']}* " | |
| f"${item['best_price']:.2f} @ {item['offers'][0]['retailer']} " | |
| f"(save ${item['potential_savings']:.2f})" | |
| ) | |
| lines.append("") | |
| subs = [f for f in found if f.get("is_substitute")] | |
| if subs: | |
| lines.append("⚠️ *Nearest matches:*") | |
| for s in subs: | |
| lines.append(f" _{s['query']}_ → {s['product_name']} (${s['best_price']:.2f})") | |
| lines.append("") | |
| if missing: | |
| lines.append(f"❓ Not found: {', '.join(missing)}") | |
| lines.append("_Try a simpler name or check in-store._") | |
| if restricted: | |
| lines.append(format_restricted_items_note(restricted)) | |
| lines.append("") | |
| lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*") | |
| lines.append("www.pricelyst.co.zw") | |
| return "\n".join(lines) | |
| # ───────────────────────────────────────────── | |
| # 8. Budget & Meal Planning Engine (Upgrade 2) | |
| # ───────────────────────────────────────────── | |
| def gemini_plan_budget(transcript: str, budget: float, context: str, | |
| catalogue_snapshot: str, language: str = "English") -> str: | |
| """ | |
| Full AI budget planner using real catalogue prices. | |
| Handles: fixed-amount shopping, monthly household, party/event, meal prep. | |
| """ | |
| if not _gemini_client: | |
| return "Budget planning is temporarily unavailable. Please try again." | |
| PROMPT = f""" | |
| You are Pricelyst AI, Pricelyst Zimbabwe's AI Shopping & Budget Advisor. | |
| You have access to REAL current prices from Zimbabwe's top supermarkets. | |
| USER REQUEST: "{transcript}" | |
| DETECTED BUDGET: ${budget:.2f} USD | |
| CONTEXT CLUES: {context} | |
| PRICELYST CATALOGUE — FULL PRODUCT INDEX (use these first, estimate only if absent): | |
| {catalogue_snapshot} | |
| CRITICAL: Walk the entire catalogue above before making any price estimate. | |
| For every item you include, check if it exists in the catalogue first. | |
| Only use "(est. ~$X)" for items with ZERO catalogue matches. | |
| Do not price alcohol or age-restricted products in WhatsApp; for those items say: please visit https://pricelyst.co.zw for more information. | |
| Keep the response concise and practical. | |
| ZIMBABWE CONTEXT: | |
| - Average family of 4 monthly food spend: $150-$250 | |
| - ZESA: $10 ≈ 108 units, $20 ≈ 210 units (2026 tariff) | |
| - ZESA: bands $0.08-$0.26/unit depending on consumption level | |
| YOUR TASK — respond based on what the user asked: | |
| 1. FIXED BUDGET SHOPPING ("I have $50"): | |
| - Build an optimal shopping list that maximises value within budget | |
| - Show exact items, quantities, store, and prices | |
| - Show running total as you add items | |
| - Show remaining balance after each category | |
| - Recommend the single best store for this basket | |
| 2. MONTHLY HOUSEHOLD BUDGET ("$2000 for a month, family of 4"): | |
| - Break down the budget into categories: Food, Household, Transport, ZESA, Contingency | |
| - For food: build a full monthly shopping list with real prices | |
| - Calculate cost per person per day | |
| - Show weekly vs monthly shopping rhythm recommendation | |
| - Flag where they can save most | |
| 3. EVENT/PARTY PLANNING ("dinner for 10", "party for 100 people"): | |
| - Scale ingredients appropriately for the headcount | |
| - Build a full shopping list with quantities scaled to guests | |
| - Show per-head cost | |
| - Recommend best store combination for the event | |
| - Include drinks, condiments, serviettes etc. from catalogue | |
| 4. MEAL PREP PLANNING ("weekly meal prep", "5 days of lunches"): | |
| - Suggest 5-7 practical Zimbabwean meals using real catalogue products | |
| - Show shopping list with prices for the full week | |
| - Calculate cost per meal and cost per serving | |
| - Give simple prep tips | |
| FORMAT RULES (WhatsApp — no markdown headers ##): | |
| - Keep it concise: maximum 6 short sections, avoid long explanations | |
| - Use *bold* for totals, store names, key numbers | |
| - Use emojis naturally 💰 🛒 📊 ✅ 🍽️ | |
| - Structure with clear sections separated by blank lines | |
| - Be specific with numbers — show your working | |
| - Always end with: total cost, per-person cost, and best store recommendation | |
| - If budget is tight, flag it honestly and suggest stretching strategies | |
| Respond in {language}. Be a genuinely useful financial advisor, not just a list generator. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) | |
| return sanitise_response(resp.text) | |
| except Exception as e: | |
| logger.error(f"Budget planner error: {e}") | |
| return "I had trouble calculating your budget plan. Please try again!" | |
| def gemini_generate_recipe( | |
| meal_name: str, | |
| found_items: List[Dict], | |
| servings: int = 4, | |
| language: str = "English") -> str: | |
| """ | |
| Generate a full recipe using real Pricelyst catalogue products. | |
| Format: ingredients+prices, total cost, steps, serving info. | |
| Estimates only for items absent from the catalogue. | |
| """ | |
| if not _gemini_client: | |
| return "Recipe generation is temporarily unavailable." | |
| products_str = "\n".join( | |
| f"- {item['product_name']} (${item['best_price']:.2f} @ {item['offers'][0]['retailer']})" | |
| for item in found_items | |
| ) if found_items else "No catalogue matches — use estimated prices for all ingredients." | |
| PROMPT = f""" | |
| You are Pricelyst AI, Zimbabwe\'s smart shopping and meal planning assistant. | |
| Generate a complete, practical recipe for: *{meal_name}* | |
| Servings: {servings} people | |
| PRICELYST CATALOGUE PRODUCTS (walk these FIRST — real prices, real stores): | |
| {products_str} | |
| OUTPUT FORMAT (WhatsApp plain text — follow this order exactly): | |
| *🍽️ {meal_name}* | Serves {servings} | |
| *🛒 Ingredients & Prices:* | |
| • [Ingredient] [qty] — *$[price]* @ [store] | |
| (For each item: use catalogue price if found, else "(est. ~$X)") | |
| *💰 Total:* $X.XX | Per serving: $X.XX | |
| Best store: [store name covering most items] | |
| *⏱️ Steps:* | |
| 1. [step] | |
| 2. [step] | |
| (Max 8 steps, practical for a Zimbabwean home kitchen) | |
| *💡 Tips:* [1 storage or substitution tip using catalogue alternatives] | |
| *For more recipes & prices: pricelyst.co.zw* | |
| RULES: | |
| - Walk the ENTIRE catalogue before estimating any price | |
| - Only mark "(est. ~$X)" if genuinely absent from catalogue | |
| - Realistic Zimbabwean home quantities, not restaurant portions | |
| - *bold* prices and totals only, plain text elsewhere | |
| - NO image URLs, NO external links except pricelyst.co.zw | |
| Respond in {language}. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) | |
| return sanitise_response(resp.text) | |
| except Exception as e: | |
| logger.error(f"Recipe generation error: {e}") | |
| return "I had trouble generating the recipe. Please try again!" | |
| def gemini_analyze_meal_image_to_recipe( | |
| image_bytes: bytes, | |
| caption: str, | |
| found_items: List[Dict], | |
| language: str = "English") -> str: | |
| """ | |
| Full pipeline: image of a meal → identify dish → recipe → shopping list with prices. | |
| """ | |
| if not _gemini_client: | |
| return "Image recipe analysis is temporarily unavailable." | |
| products_str = "\n".join( | |
| f"- {item['product_name']} (${item['best_price']:.2f} @ {item['offers'][0]['retailer']})" | |
| for item in found_items | |
| ) if found_items else "No catalogue matches found — use estimated prices." | |
| PROMPT = f""" | |
| You are Pricelyst AI, Pricelyst Zimbabwe's AI Meal & Shopping Advisor. | |
| A user sent a photo of a meal/dish. Your job: identify it, then immediately provide | |
| a full recipe with a Zimbabwe-priced shopping list. | |
| USER CAPTION: "{caption}" | |
| AVAILABLE CATALOGUE PRODUCTS: | |
| {products_str} | |
| DO THIS IN ONE RESPONSE: | |
| 1. 🍽️ *Identify the dish* — name it confidently | |
| 2. 📝 *Full recipe* — ingredients with quantities for 4 servings | |
| 3. 🛒 *Shopping list* — match ingredients to catalogue products with prices | |
| 4. 💰 *Total cost* — exact calculation, cost per serving, best store | |
| 5. ⏱️ *Quick cook guide* — 5-6 simple steps | |
| Keep it warm, practical, Zimbabwean. Format for WhatsApp (*bold*, emojis). | |
| Respond in {language}. | |
| """ | |
| try: | |
| image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=[PROMPT, image_part], | |
| ) | |
| return sanitise_response(resp.text) | |
| except Exception as e: | |
| logger.error(f"Meal image recipe error: {e}") | |
| return "I had trouble analyzing that meal photo. Please try again!" | |
| def build_catalogue_snapshot() -> str: | |
| """ | |
| Build a COMPLETE category-organised price snapshot for Gemini prompts. | |
| No limits — every product in the index is included so Gemini sees | |
| the full catalogue when generating recipes, budgets and meal plans. | |
| Niche products are equally represented alongside popular ones. | |
| Within each category: sorted by views desc then price asc so the | |
| most contextually useful products appear first in token budget. | |
| """ | |
| df = get_market_index() | |
| if df.empty: | |
| return "Catalogue unavailable." | |
| try: | |
| offers = df[df["is_offer"] == True].copy() | |
| if offers.empty: | |
| return "No priced products available." | |
| # Best price per product across all stores — keep brand for context | |
| best_prices = ( | |
| offers.groupby(["product_name", "category", "brand"]) | |
| .agg(min_price=("price", "min"), | |
| retailer=("retailer", "first"), | |
| views=("views", "max")) | |
| .reset_index() | |
| ) | |
| lines = [] | |
| total_products = 0 | |
| # All categories, sorted alphabetically for consistency | |
| categories = sorted(best_prices["category"].unique()) | |
| for cat in categories: | |
| cat_df = (best_prices[best_prices["category"] == cat] | |
| .sort_values(["views", "min_price"], | |
| ascending=[False, True])) | |
| if cat_df.empty: | |
| continue | |
| lines.append(f"\n[{cat}]") | |
| for _, row in cat_df.iterrows(): | |
| brand_str = f" ({row['brand']})" if row["brand"] else "" | |
| lines.append( | |
| f" {row['product_name']}{brand_str}: " | |
| f"${row['min_price']:.2f} @ {row['retailer']}" | |
| ) | |
| total_products += 1 | |
| logger.info( | |
| f"Catalogue snapshot: {total_products} products " | |
| f"across {len(categories)} categories (no limits applied)" | |
| ) | |
| return "\n".join(lines) | |
| except Exception as e: | |
| logger.error(f"Catalogue snapshot error: {e}") | |
| return "Catalogue snapshot unavailable." | |
| def get_products_by_category(category_keyword: str, limit: int = 20) -> List[Dict]: | |
| """ | |
| Return all products matching a category keyword — used by recipe/budget | |
| engines to surface the full range of available options in a category. | |
| """ | |
| df = get_market_index() | |
| if df.empty: | |
| return [] | |
| try: | |
| kw = _norm(category_keyword) | |
| offers = df[df["is_offer"] == True].copy() | |
| matches = offers[offers["category"].str.lower().str.contains(kw, na=False) | |
| | offers["search_vector"].str.contains(kw, na=False)] | |
| if matches.empty: | |
| return [] | |
| # Best price per product in this category | |
| best = (matches.groupby("product_name") | |
| .agg(best_price=("price", "min"), | |
| retailer=("retailer", "first"), | |
| brand=("brand", "first"), | |
| category=("category", "first")) | |
| .reset_index() | |
| .sort_values("best_price")) | |
| return best.head(limit).to_dict("records") | |
| except Exception as e: | |
| logger.error(f"get_products_by_category error: {e}") | |
| return [] | |
| def extract_budget_from_text(transcript: str) -> Tuple[Optional[float], str]: | |
| """ | |
| Pull a dollar amount and context from free text. | |
| Returns (amount, context_description). | |
| Examples: | |
| "I have $50" → (50.0, "fixed_budget") | |
| "$2000 for the month" → (2000.0, "monthly_household") | |
| "party for 100 people" → (None, "event_100_people") | |
| "dinner for 5" → (None, "event_5_people") | |
| """ | |
| # Find dollar amount | |
| amount_match = re.search(r'\$\s*([\d,]+(?:\.\d{1,2})?)', transcript) | |
| if not amount_match: | |
| amount_match = re.search(r'([\d,]+(?:\.\d{1,2})?)\s*(?:dollars?|usd)', transcript, re.I) | |
| amount = None | |
| if amount_match: | |
| try: | |
| amount = float(amount_match.group(1).replace(",", "")) | |
| except ValueError: | |
| pass | |
| # Determine context | |
| t = transcript.lower() | |
| if any(w in t for w in ["month", "monthly", "per month"]): | |
| context = "monthly_household" | |
| elif any(w in t for w in ["party", "event", "wedding", "function", "people", "guests", "pax"]): | |
| people_match = re.search(r'(\d+)\s*(?:people|guests|pax|persons?)', t) | |
| guests = people_match.group(1) if people_match else "unknown" | |
| context = f"event_{guests}_people" | |
| elif any(w in t for w in ["week", "weekly", "meal prep", "prep"]): | |
| context = "weekly_meal_prep" | |
| elif any(w in t for w in ["recipe", "cook", "make", "prepare", "how to"]): | |
| context = "recipe_request" | |
| elif amount: | |
| context = "fixed_budget" | |
| else: | |
| context = "general_planning" | |
| return amount, context | |
| return amount, context | |
| # ───────────────────────────────────────────── | |
| # 8b. Conversation Context Engine | |
| # ───────────────────────────────────────────── | |
| def apply_context_mutation( | |
| current_basket: List[str], | |
| current_found: List[Dict], | |
| mutation: Dict) -> Tuple[List[str], str]: | |
| """ | |
| Apply a context mutation to an active basket. | |
| Returns (new_item_list, description_of_change). | |
| Mutations understood: | |
| add_items : ["milk", "bread"] | |
| remove_items : ["coffee"] — matched fuzzily | |
| remove_most_expensive : True | |
| remove_cheapest : True | |
| what_if_add : ["milk"] — compute total but don't persist | |
| """ | |
| action = mutation.get("action", "") | |
| target_items = [_norm(x) for x in mutation.get("items", [])] | |
| desc = "" | |
| if action == "add_items": | |
| new_items = list(current_basket) + mutation.get("items", []) | |
| desc = f"Added: {', '.join(mutation.get('items', []))}" | |
| return new_items, desc | |
| if action == "remove_items": | |
| removed = [] | |
| new_basket = [] | |
| for orig in current_basket: | |
| # Match against both original query and resolved product name | |
| orig_norm = _norm(orig) | |
| match = any( | |
| t in orig_norm or orig_norm in t | |
| for t in target_items | |
| ) | |
| if match: | |
| removed.append(orig) | |
| else: | |
| new_basket.append(orig) | |
| desc = f"Removed: {', '.join(removed)}" if removed else "Item not found in current basket" | |
| return new_basket, desc | |
| if action == "remove_most_expensive": | |
| if not current_found: | |
| return current_basket, "No active basket to modify" | |
| most_exp = max(current_found, key=lambda x: x.get("best_price", 0)) | |
| new_basket = [x for x in current_basket | |
| if _norm(x) not in _norm(most_exp["product_name"]) | |
| and _norm(most_exp["product_name"]) not in _norm(x)] | |
| desc = f"Removed most expensive: {most_exp['product_name']} (${most_exp['best_price']:.2f})" | |
| return new_basket, desc | |
| if action == "remove_cheapest": | |
| if not current_found: | |
| return current_basket, "No active basket to modify" | |
| cheapest = min(current_found, key=lambda x: x.get("best_price", 0)) | |
| new_basket = [x for x in current_basket | |
| if _norm(x) not in _norm(cheapest["product_name"]) | |
| and _norm(cheapest["product_name"]) not in _norm(x)] | |
| desc = f"Removed cheapest: {cheapest['product_name']} (${cheapest['best_price']:.2f})" | |
| return new_basket, desc | |
| return current_basket, "No change" | |
| def gemini_resolve_context_intent(message: str, active_basket: List[str], | |
| active_found: List[Dict]) -> Dict[str, Any]: | |
| """ | |
| Determine if a message is a context mutation on the active basket, | |
| or a fresh new query. | |
| Returns a dict: | |
| { | |
| "is_context": bool, | |
| "action": "add_items|remove_items|remove_most_expensive|remove_cheapest| | |
| what_if_add|show_total|fresh_query", | |
| "items": ["item1"], # for add/remove/what_if | |
| "explanation": "str" | |
| } | |
| """ | |
| if not _gemini_client or not active_basket: | |
| return {"is_context": False, "action": "fresh_query", "items": []} | |
| basket_str = ", ".join(active_basket) | |
| found_str = "\n".join( | |
| f"- {f['product_name']}: ${f['best_price']:.2f}" | |
| for f in (active_found or []) | |
| ) | |
| PROMPT = f""" | |
| You are a shopping assistant context resolver. | |
| The user has an ACTIVE BASKET: [{basket_str}] | |
| Resolved products and prices: | |
| {found_str} | |
| New user message: "{message}" | |
| Decide: is this message a MODIFICATION of the active basket, or a FRESH NEW QUERY? | |
| Context modifications include: | |
| - "remove coffee" / "take out the bread" / "remove the most expensive" → remove_items / remove_most_expensive | |
| - "add milk" / "what if I add eggs" / "include sugar" → add_items / what_if_add | |
| - "remove the cheapest" / "drop the cheapest item" → remove_cheapest | |
| - "what's the total now" / "show total" → show_total | |
| - "let's see" / "ok" / "sounds good" → show_total (treat as confirmation request) | |
| Fresh new queries include: | |
| - Completely new products unrelated to the basket ("price of diapers") | |
| - New categories / topics ("deals today", "ZESA units") | |
| - Greetings or topic changes | |
| Return STRICT JSON: | |
| {{ | |
| "is_context": boolean, | |
| "action": "add_items|remove_items|remove_most_expensive|remove_cheapest|what_if_add|show_total|fresh_query", | |
| "items": ["item to add or remove"], | |
| "explanation": "one line reason" | |
| }} | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT, | |
| config=genai_types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| result = _safe_json_loads(resp.text, {"is_context": False, "action": "fresh_query", "items": []}) | |
| logger.info(f"Context resolution: {result}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Context resolution error: {e}") | |
| return {"is_context": False, "action": "fresh_query", "items": []} | |
| def format_context_result( | |
| action: str, | |
| change_desc: str, | |
| analyst: Dict, | |
| what_if: bool = False) -> str: | |
| """Format a context mutation result as a WhatsApp message.""" | |
| lines = [] | |
| if action == "show_total" or not change_desc: | |
| lines.append("🛒 *Current Basket*\n") | |
| elif what_if: | |
| lines.append(f"🔮 *What if scenario*\n_{change_desc}_\n") | |
| else: | |
| lines.append(f"✅ *Basket updated*\n_{change_desc}_\n") | |
| found = analyst.get("found_items", []) | |
| matrix = analyst.get("market_matrix", []) | |
| missing = analyst.get("global_missing", []) | |
| if found: | |
| for item in found: | |
| sub = " _(nearest match)_" if item.get("is_substitute") else "" | |
| lines.append(f"• *{item['product_name']}*{sub} — ${item['best_price']:.2f}") | |
| lines.append("") | |
| if matrix: | |
| best = matrix[0] | |
| lines.append(f"💰 *Best basket total: ${best['total_price']:.2f}* @ {best['retailer']}") | |
| savings = best.get("basket_savings", 0) | |
| if savings > 0.10: | |
| lines.append(f" Saves *${savings:.2f}* vs most expensive option") | |
| lines.append("") | |
| if len(matrix) > 1: | |
| lines.append("Other stores:") | |
| for s in matrix[1:3]: | |
| lines.append(f" {s['retailer']}: ${s['total_price']:.2f}") | |
| if missing: | |
| lines.append(f"\n⚠️ Not found: {', '.join(missing)}") | |
| lines.append("") | |
| lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*") | |
| lines.append("www.pricelyst.co.zw") | |
| return "\n".join(lines) | |
| # ───────────────────────────────────────────── | |
| # 9. Gemini Helpers | |
| # ───────────────────────────────────────────── | |
| def _safe_json_loads(s: str, fallback: Any) -> Any: | |
| try: | |
| cleaned = s | |
| if "```json" in cleaned: | |
| cleaned = cleaned.split("```json")[1].split("```")[0] | |
| elif "```" in cleaned: | |
| cleaned = cleaned.split("```")[1] | |
| return json.loads(cleaned.strip()) | |
| except Exception as e: | |
| logger.error(f"JSON parse error: {e} | raw: {s[:300]}") | |
| return fallback | |
| def gemini_detect_intent(transcript: str) -> Dict[str, Any]: | |
| """Classify intent including new BUDGET_PLANNER and RECIPE_REQUEST intents.""" | |
| if not _gemini_client: | |
| return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} | |
| PROMPT = """ | |
| Analyze the transcript below and return STRICT JSON. | |
| Intents: | |
| - CASUAL_CHAT : Greetings, "hi", off-topic | |
| - SHOPPING_BASKET : Searching for prices / cheapest X | |
| - UTILITY_CALC : Electricity / ZESA / fuel cost questions | |
| - STORE_DECISION : "Which store is cheapest?", "Where should I shop?" | |
| - EVENT_PLANNING : Implicit lists — "plan a braai", "wedding grocery list", "dinner for 5" | |
| - CATALOGUE_REQUEST : User wants a PDF price list / catalogue / deals sheet | |
| - DEALS_EXPLORE : "Today's deals", "promotions", "what's on special" | |
| - DISCOVER : "What products do you have?", "show me your categories" | |
| - BUDGET_PLANNER : User mentions a budget amount + shopping/planning goal. | |
| e.g. "I have $50", "$2000 for the month", "party for 100 people", | |
| "what can I buy with $30", "monthly groceries for family of 4" | |
| - RECIPE_REQUEST : User wants a recipe, meal idea, how to cook something, | |
| - FEEDBACK : User wants to give feedback, suggest a product, report a missing item, | |
| report a wrong price, or make a general suggestion about the service. | |
| meal prep plan, or sends a meal image. | |
| e.g. "recipe for sadza", "how do I cook bream", "weekly meal plan" | |
| Rules: | |
| - Extract items: translate ALL items to English (e.g. 'hupfu' → 'maize meal'). | |
| - If only a concept is given (e.g. "plan a braai"), set is_event_planning=true, items=[]. | |
| - Detect user language accurately (Shona, Ndebele, English). | |
| - budget_amount: extract numeric USD amount if mentioned (e.g. 50 from "$50"). | |
| - headcount: extract number of people if mentioned (e.g. 100 from "party for 100"). | |
| - meal_name: extract dish name if recipe is requested. | |
| - store_preference: store name if explicitly mentioned. | |
| - utility_amount: numeric value for ZESA queries. | |
| JSON Schema: | |
| { | |
| "actionable": boolean, | |
| "intent": "string", | |
| "items": ["string"], | |
| "utility_amount": number, | |
| "budget_amount": number | null, | |
| "headcount": number | null, | |
| "meal_name": "string | null", | |
| "store_preference": "string | null", | |
| "is_event_planning": boolean, | |
| "language": "string", | |
| "catalogue_scope": "string | null" | |
| } | |
| Transcript: """ + transcript | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT, | |
| config=genai_types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, { | |
| "actionable": False, "intent": "CASUAL_CHAT", | |
| "language": "English", "items": [], | |
| }) | |
| except Exception as e: | |
| logger.error(f"Intent detect error: {e}") | |
| return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []} | |
| def gemini_explode_concept(transcript: str) -> List[str]: | |
| """Converts an event/meal concept into a concrete grocery list.""" | |
| if not _gemini_client: | |
| return [] | |
| PROMPT = f""" | |
| User wants to plan: "{transcript}" | |
| Generate 10-15 essential Zimbabwean grocery items for this. | |
| Use English terms for database lookup (e.g. 'Maize Meal', 'Cooking Oil', 'Beef'). | |
| Return ONLY a JSON list of strings. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, contents=PROMPT, | |
| config=genai_types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, []) | |
| except Exception as e: | |
| logger.error(f"Concept explode error: {e}") | |
| return [] | |
| def gemini_analyze_image(image_bytes: bytes, caption: str = "") -> Dict[str, Any]: | |
| """Analyse a WhatsApp image — grocery list, product, or meal dish.""" | |
| if not _gemini_client: | |
| return {"type": "IRRELEVANT", "items": [], "description": ""} | |
| PROMPT = f""" | |
| Analyze this image. Context caption: "{caption}" | |
| Classify: | |
| 1. SHOPPING_LIST → Extract each item (translate to English). | |
| 2. SINGLE_PRODUCT → Extract BRAND + NAME (e.g. "Pepsi 500ml"). | |
| 3. MEAL_DISH → Identify dish name + core ingredients. | |
| 4. IRRELEVANT → Not shopping related. | |
| Return STRICT JSON: | |
| {{ | |
| "type": "SHOPPING_LIST" | "SINGLE_PRODUCT" | "MEAL_DISH" | "IRRELEVANT", | |
| "items": ["item1", "item2"], | |
| "description": "short description" | |
| }} | |
| """ | |
| try: | |
| image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=[PROMPT, image_part], | |
| config=genai_types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": [], "description": ""}) | |
| except Exception as e: | |
| logger.error(f"Vision error: {e}") | |
| return {"type": "IRRELEVANT", "items": [], "description": ""} | |
| def sanitise_response(text: str) -> str: | |
| """ | |
| Post-process any Gemini response before sending to WhatsApp. | |
| - Strips all raw URLs except pricelyst.co.zw (no product image links, | |
| no api.pricelyst.co.zw/images/... paths, no external URLs) | |
| - Removes lines that are just a bare URL (image link lines Gemini adds) | |
| - Normalises excessive blank lines to max 2 in a row | |
| """ | |
| import re | |
| lines = text.split("\n") | |
| clean = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| # Drop lines that are just a URL (image references, API links) | |
| if re.match(r'^https?://\S+$', stripped): | |
| continue | |
| # Drop inline image URL patterns from Gemini | |
| # e.g. "Here is an image: https://api.pricelyst.co.zw/images/..." | |
| line = re.sub( | |
| r'https?://(?!(?:www\.)?pricelyst\.co\.zw)\S+', | |
| '', | |
| line | |
| ) | |
| # Drop lines that became empty after URL removal | |
| line_stripped = line.strip() | |
| if not line_stripped: | |
| clean.append("") | |
| else: | |
| clean.append(line) | |
| # Collapse runs of more than 2 blank lines | |
| result = [] | |
| blank_count = 0 | |
| for line in clean: | |
| if line.strip() == "": | |
| blank_count += 1 | |
| if blank_count <= 2: | |
| result.append(line) | |
| else: | |
| blank_count = 0 | |
| result.append(line) | |
| return "\n".join(result).strip() | |
| def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, | |
| chat_history: str = "", language: str = "English") -> str: | |
| """Generate Pricelyst AI's WhatsApp reply for standard intents.""" | |
| if not _gemini_client: | |
| return "Hi! I'm Pricelyst AI from Pricelyst. Having a bit of trouble — please try again shortly." | |
| context_str = "" | |
| if chat_history: | |
| context_str += f"RECENT CHAT:\n{chat_history}\n\n" | |
| zesa_10 = calculate_zesa_units(10) | |
| zesa_20 = calculate_zesa_units(20) | |
| context_str += ( | |
| f"ZIMBABWE CONTEXT (ZESA 2026 tariffs, incl 6% levy):\n" | |
| f" $10 = {zesa_10['est_units_kwh']} units | " | |
| f"$20 = {zesa_20['est_units_kwh']} units\n" | |
| f" Bands: 0-50u=$0.08/u, 51-100=$0.09/u, 101-200=$0.16/u, " | |
| f"201-300=$0.23/u, 301-400=$0.25/u, 401+=$0.26/u\n" | |
| f" Note: {ZIM_CONTEXT['zesa_note']}\n" | |
| ) | |
| if analyst_data: | |
| context_str += f"\nANALYST DATA:\n{json.dumps(analyst_data, default=str)}\n" | |
| PROMPT = f""" | |
| You are Pricelyst AI, Pricelyst Zimbabwe's friendly WhatsApp Shopping Advisor 🛒. | |
| Mission: shortest path to value + complete price transparency for Zimbabwean shoppers. | |
| INPUT: "{transcript}" | |
| USER LANGUAGE: {language} | |
| INTENT: {intent.get('intent', 'CASUAL_CHAT')} | |
| CONTEXT: | |
| {context_str} | |
| FORMATTING (WhatsApp plain text — NO ## headers): | |
| - *bold* for store names, prices, key figures | |
| - Emojis naturally (✅ 🛒 💰 📍 ⚠️ 🔥 🍽️ 📸 ⚡) | |
| - Blank lines between sections — breathe, don't cram | |
| - Mobile-first: short lines, scannable | |
| - NEVER include image URLs, product image links, or any URLs except pricelyst.co.zw | |
| - NO product images — WhatsApp does not render inline images from URLs in chat | |
| LOGIC: | |
| 1. BASKET (≤3 items): The formatter already structured it — just add a warm intro line. | |
| 2. BASKET (>3 items): Highlight the best store deal and 2-3 standout savings. | |
| 3. SINGLE ITEM: Best price first, then 2-3 alternatives. State exact savings. | |
| 4. ZESA: Show units calculation clearly with tier breakdown. | |
| 5. CASUAL/GREETING: You are a smart shopping advisor, not a generic chatbot. | |
| Do NOT just say "Hi how can I help". Instead, acknowledge warmly AND show | |
| one specific insight from the analyst data if available (a deal, a fact, | |
| a tip). End with a concrete invitation to search. | |
| 6. DEALS_EXPLORE: List 5-8 deals with price and store. Make it feel like | |
| a real market bulletin — exciting, specific, local. | |
| 7. EVENT_PLANNING: Acknowledge warmly, then present the basket clearly. | |
| 8. CATALOGUE_REQUEST: Confirm PDF is being prepared. | |
| 9. DISCOVER: List available categories with examples of what to search in each. | |
| 10. OFF_TOPIC: Gently steer back to shopping — you are a shopping advisor, | |
| not a general assistant. | |
| Always end with a specific, useful follow-up question or CTA — not a generic one. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) | |
| return sanitise_response(resp.text) | |
| except Exception as e: | |
| logger.error(f"Chat response error: {e}") | |
| return "I checked the prices but I'm having trouble displaying them right now. Please try again!" | |
| def gemini_translate(text: str, target_lang: str) -> str: | |
| """Translate Pricelyst AI's English response into the user's language if needed.""" | |
| if not _gemini_client or not target_lang or target_lang.lower() == "english": | |
| return text | |
| PROMPT = f""" | |
| Translate this WhatsApp shopping assistant reply from English to {target_lang}. | |
| Rules: | |
| - Keep prices ($X.XX), store names, product names UNCHANGED. | |
| - Keep WhatsApp formatting (*bold*, emojis) UNCHANGED. | |
| - Natural, conversational tone. | |
| Text: "{text}" | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) | |
| return resp.text.strip() | |
| except Exception as e: | |
| logger.error(f"Translation error: {e}") | |
| return text | |
| # ───────────────────────────────────────────── | |
| # 10. Catalogue PDF Generator | |
| # ───────────────────────────────────────────── | |
| def _build_pdf_header(story, styles, scope_label: str, title: str) -> None: | |
| """Shared Pricelyst branded header for all PDF types.""" | |
| from reportlab.lib import colors | |
| from reportlab.lib.styles import ParagraphStyle | |
| from reportlab.lib.units import cm | |
| from reportlab.platypus import (Paragraph, Spacer, HRFlowable, | |
| Table as RLTable, TableStyle as RLTS) | |
| from reportlab.platypus import Image as RLImage | |
| from reportlab.lib.enums import TA_CENTER | |
| NAVY = colors.HexColor("#003087") | |
| RED = colors.HexColor("#E63329") | |
| GREY = colors.HexColor("#555555") | |
| sub_style = ParagraphStyle("SubH", parent=styles["Normal"], fontSize=10, | |
| textColor=GREY, alignment=TA_CENTER, spaceAfter=2) | |
| wordmark = ParagraphStyle("WM", parent=styles["Title"], fontSize=24, | |
| textColor=NAVY, fontName="Helvetica-Bold") | |
| # Use module-level cached logo — downloaded via requests (works on HF), | |
| # not urllib.request which hits the same egress block as Meta/Cloudflare | |
| logo_path = _get_logo_path() | |
| logo_el = None | |
| if logo_path: | |
| try: | |
| logo_el = RLImage(logo_path, width=3*cm, height=3*cm, kind="proportional") | |
| except Exception as e: | |
| logger.warning(f"Logo RLImage init failed: {e}") | |
| if logo_el: | |
| hdr_data = [[logo_el, | |
| [Paragraph("PRICELYST.", wordmark), | |
| Paragraph("Zimbabwe\'s #1 Price Comparison", sub_style)]]] | |
| hdr_tbl = RLTable(hdr_data, colWidths=[3.5*cm, None]) | |
| hdr_tbl.setStyle(RLTS([ | |
| ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), | |
| ("LEFTPADDING", (0, 0), (-1, -1), 0), | |
| ("RIGHTPADDING", (0, 0), (-1, -1), 0), | |
| ("BOTTOMPADDING", (0, 0), (-1, -1), 0), | |
| ])) | |
| story.append(hdr_tbl) | |
| else: | |
| # Text-only fallback — still branded with red dot accent | |
| story.append(Paragraph("PRICELYST.", ParagraphStyle( | |
| "WMFb", parent=styles["Title"], fontSize=28, textColor=NAVY, | |
| fontName="Helvetica-Bold", alignment=TA_CENTER))) | |
| story.append(Paragraph( | |
| "Zimbabwe\'s #1 Price Comparison", | |
| ParagraphStyle("TagFb", parent=styles["Normal"], fontSize=11, | |
| textColor=RED, alignment=TA_CENTER))) | |
| def _build_pdf_footer(story, styles) -> None: | |
| """Shared branded footer.""" | |
| from reportlab.lib import colors | |
| from reportlab.lib.styles import ParagraphStyle | |
| from reportlab.lib.units import cm | |
| from reportlab.platypus import Spacer, HRFlowable, Paragraph | |
| from reportlab.lib.enums import TA_CENTER | |
| story.append(Spacer(1, 0.4*cm)) | |
| story.append(HRFlowable(width="100%", thickness=2, | |
| color=colors.HexColor("#E63329"))) | |
| story.append(Spacer(1, 0.15*cm)) | |
| story.append(Paragraph( | |
| "Prices subject to change without notice. Data sourced live from participating retailers.", | |
| ParagraphStyle("Footer", parent=styles["Normal"], fontSize=7, | |
| textColor=colors.HexColor("#888888"), alignment=TA_CENTER) | |
| )) | |
| story.append(Spacer(1, 0.08*cm)) | |
| story.append(Paragraph( | |
| "<b>For comprehensive basket comparisons and retailer Pre-Orders visit " | |
| "<u>www.pricelyst.co.zw</u></b>", | |
| ParagraphStyle("FooterCTA", parent=styles["Normal"], fontSize=8, | |
| textColor=colors.HexColor("#003087"), alignment=TA_CENTER, | |
| fontName="Helvetica-Bold") | |
| )) | |
| def generate_catalogue_pdf(title: str, items: List[Dict], | |
| scope_label: str = "Price Comparison") -> Optional[str]: | |
| """ | |
| Generate a price-comparison PDF. | |
| Layout update: | |
| - Brand column removed. | |
| - Savings is its own column. | |
| - Best offer is marked inside the Price column with a check mark. | |
| """ | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib import colors | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import cm | |
| from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle) | |
| filename = f"pricelyst_catalogue_{uuid.uuid4().hex[:8]}.pdf" | |
| filepath = os.path.join(CATALOGUE_DIR, filename) | |
| doc = SimpleDocTemplate(filepath, pagesize=A4, | |
| rightMargin=1.4*cm, leftMargin=1.4*cm, | |
| topMargin=1.4*cm, bottomMargin=1.4*cm) | |
| styles = getSampleStyleSheet() | |
| NAVY = colors.HexColor("#003087") | |
| LIGHT = colors.HexColor("#EEF2FF") | |
| head_style = ParagraphStyle("ColH", parent=styles["Normal"], fontSize=9, | |
| textColor=colors.white, fontName="Helvetica-Bold") | |
| cell_style = ParagraphStyle("Cell", parent=styles["Normal"], fontSize=8, | |
| leading=10, textColor=colors.HexColor("#222222")) | |
| best_style = ParagraphStyle("Best", parent=styles["Normal"], fontSize=8, | |
| leading=10, textColor=NAVY, fontName="Helvetica-Bold") | |
| money_style = ParagraphStyle("Money", parent=styles["Normal"], fontSize=8, | |
| leading=10, textColor=colors.HexColor("#222222")) | |
| story = [] | |
| _build_pdf_header(story, styles, scope_label, title) | |
| story.append(Spacer(1, 0.2*cm)) | |
| # Product, Category, Store, Price, Savings | |
| col_widths = [6.4*cm, 3.0*cm, 3.3*cm, 2.4*cm, 2.3*cm] | |
| table_data = [[ | |
| Paragraph("Product", head_style), | |
| Paragraph("Category", head_style), | |
| Paragraph("Store", head_style), | |
| Paragraph("Price", head_style), | |
| Paragraph("Savings", head_style), | |
| ]] | |
| for item in items: | |
| offers = item.get("offers", []) | |
| best_p = float(item.get("best_price", 0) or 0) | |
| savings = float(item.get("potential_savings", 0) or 0) | |
| sub_note = " ⚠ nearest" if item.get("is_substitute") else "" | |
| savings_text = f"${savings:.2f}" if savings > 0.05 else "—" | |
| if not offers: | |
| table_data.append([ | |
| Paragraph(item.get("query", "Unknown") + sub_note, cell_style), | |
| Paragraph("—", cell_style), | |
| Paragraph("Not listed", cell_style), | |
| Paragraph("N/A", cell_style), | |
| Paragraph("—", cell_style), | |
| ]) | |
| continue | |
| for idx_o, offer in enumerate(offers): | |
| price = float(offer.get("price", 0) or 0) | |
| is_best = abs(price - best_p) < 0.001 | |
| price_label = f"✓ ${price:.2f}" if is_best else f"${price:.2f}" | |
| style = best_style if is_best else money_style | |
| table_data.append([ | |
| Paragraph((item.get("product_name", "") + sub_note) if idx_o == 0 else "", cell_style), | |
| Paragraph(item.get("category", "") if idx_o == 0 else "", cell_style), | |
| Paragraph(str(offer.get("retailer", "")), style), | |
| Paragraph(price_label, style), | |
| Paragraph(savings_text if idx_o == 0 else "", cell_style), | |
| ]) | |
| tbl = Table(table_data, colWidths=col_widths, repeatRows=1) | |
| tbl.setStyle(TableStyle([ | |
| ("BACKGROUND", (0, 0), (-1, 0), NAVY), | |
| ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), | |
| ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, LIGHT]), | |
| ("GRID", (0, 0), (-1, -1), 0.35, colors.HexColor("#CCCCCC")), | |
| ("VALIGN", (0, 0), (-1, -1), "TOP"), | |
| ("LEFTPADDING", (0, 0), (-1, -1), 4), | |
| ("RIGHTPADDING", (0, 0), (-1, -1), 4), | |
| ("TOPPADDING", (0, 0), (-1, -1), 3), | |
| ("BOTTOMPADDING", (0, 0), (-1, -1), 3), | |
| ])) | |
| story.append(tbl) | |
| _build_pdf_footer(story, styles) | |
| doc.build(story) | |
| logger.info(f"Catalogue PDF generated: {filepath}") | |
| return filepath | |
| except Exception as e: | |
| logger.error(f"PDF generation failed: {e}", exc_info=True) | |
| return None | |
| def generate_rich_pdf(title: str, scope_label: str, body_markdown: str, | |
| items: Optional[List[Dict]] = None) -> Optional[str]: | |
| """ | |
| Generate a rich branded PDF for budget plans, meal plans, recipes. | |
| body_markdown: the Gemini-generated text response, converted to clean paragraphs. | |
| items: optional found_items list for an appended price table. | |
| """ | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib import colors | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import cm | |
| from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, | |
| Table, TableStyle, HRFlowable) | |
| from reportlab.lib.enums import TA_LEFT | |
| filename = f"pricelyst_rich_{uuid.uuid4().hex[:8]}.pdf" | |
| filepath = os.path.join(CATALOGUE_DIR, filename) | |
| doc = SimpleDocTemplate(filepath, pagesize=A4, | |
| rightMargin=1.5*cm, leftMargin=1.5*cm, | |
| topMargin=1.5*cm, bottomMargin=1.5*cm) | |
| styles = getSampleStyleSheet() | |
| NAVY = colors.HexColor("#003087") | |
| RED = colors.HexColor("#E63329") | |
| LIGHT = colors.HexColor("#EEF2FF") | |
| body_style = ParagraphStyle("Body", parent=styles["Normal"], fontSize=10, | |
| leading=14, spaceAfter=4, textColor=colors.HexColor("#222222")) | |
| h1_style = ParagraphStyle("H1", parent=styles["Heading2"], fontSize=13, | |
| textColor=NAVY, fontName="Helvetica-Bold", | |
| spaceAfter=6, spaceBefore=10) | |
| bullet_style = ParagraphStyle("Bullet", parent=styles["Normal"], fontSize=10, | |
| leading=14, leftIndent=12, spaceAfter=3, | |
| textColor=colors.HexColor("#333333")) | |
| bold_style = ParagraphStyle("Bold", parent=styles["Normal"], fontSize=10, | |
| leading=14, fontName="Helvetica-Bold", | |
| textColor=NAVY, spaceAfter=4) | |
| story = [] | |
| _build_pdf_header(story, styles, scope_label, title) | |
| # ── Parse body_markdown into PDF paragraphs ──────────────────────── | |
| # Strip WhatsApp formatting and convert to clean paragraphs | |
| lines = body_markdown.replace("*", "").replace("_", "").split("\n") | |
| for line in lines: | |
| stripped = line.strip() | |
| if not stripped: | |
| story.append(Spacer(1, 0.15*cm)) | |
| continue | |
| # Detect section headers (lines ending with colon or all caps short lines) | |
| if (stripped.endswith(":") and len(stripped) < 60) or (stripped.isupper() and len(stripped) < 50): | |
| story.append(Paragraph(stripped, h1_style)) | |
| elif stripped.startswith(("•", "-", "✅", "▪️", "🛒", "💰", "🔥", "⚡", "🍽️")): | |
| clean = stripped.lstrip("•-✅▪️🛒💰🔥⚡🍽️ ") | |
| story.append(Paragraph(f"• {clean}", bullet_style)) | |
| elif stripped.startswith(tuple("123456789")) and (". " in stripped or ". " in stripped): | |
| story.append(Paragraph(stripped, bullet_style)) | |
| elif any(kw in stripped.lower() for kw in ["total:", "grand total", "cost per", "budget:", "subtotal"]): | |
| story.append(Paragraph(stripped, bold_style)) | |
| else: | |
| story.append(Paragraph(stripped, body_style)) | |
| # ── Optional price table ─────────────────────────────────────────── | |
| if items: | |
| story.append(Spacer(1, 0.4*cm)) | |
| story.append(HRFlowable(width="100%", thickness=1, | |
| color=colors.HexColor("#CCCCCC"))) | |
| story.append(Spacer(1, 0.2*cm)) | |
| story.append(Paragraph("Shopping List — Price Comparison", | |
| ParagraphStyle("TblTitle", parent=styles["Heading3"], | |
| fontSize=11, textColor=NAVY, | |
| fontName="Helvetica-Bold"))) | |
| story.append(Spacer(1, 0.15*cm)) | |
| head_style = ParagraphStyle("ColH", parent=styles["Normal"], fontSize=9, | |
| textColor=colors.white, fontName="Helvetica-Bold") | |
| cell_style = ParagraphStyle("Cell", parent=styles["Normal"], fontSize=8, | |
| textColor=colors.HexColor("#222222")) | |
| best_style = ParagraphStyle("Best", parent=styles["Normal"], fontSize=8, | |
| textColor=NAVY, fontName="Helvetica-Bold") | |
| col_w = [5.5*cm, 3*cm, 3*cm, 2.5*cm, 3*cm] | |
| tdata = [[ | |
| Paragraph("Ingredient", head_style), | |
| Paragraph("Best Price", head_style), | |
| Paragraph("Store", head_style), | |
| Paragraph("Save", head_style), | |
| Paragraph("Alternatives", head_style), | |
| ]] | |
| for item in items: | |
| offers = item.get("offers", []) | |
| best_p = item.get("best_price", 0) | |
| savings = item.get("potential_savings", 0) | |
| alts = ", ".join( | |
| f"{o['retailer']} ${o['price']:.2f}" | |
| for o in offers[1:3] | |
| ) if len(offers) > 1 else "—" | |
| best_store = offers[0]["retailer"] if offers else "—" | |
| tdata.append([ | |
| Paragraph(item.get("product_name", item.get("query", "")), cell_style), | |
| Paragraph(f"${best_p:.2f}", best_style), | |
| Paragraph(best_store, cell_style), | |
| Paragraph(f"${savings:.2f}" if savings > 0.05 else "—", cell_style), | |
| Paragraph(alts, cell_style), | |
| ]) | |
| tbl = Table(tdata, colWidths=col_w, repeatRows=1) | |
| tbl.setStyle(TableStyle([ | |
| ("BACKGROUND", (0, 0), (-1, 0), NAVY), | |
| ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), | |
| ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, LIGHT]), | |
| ("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#CCCCCC")), | |
| ("VALIGN", (0, 0), (-1, -1), "TOP"), | |
| ("LEFTPADDING", (0, 0), (-1, -1), 4), | |
| ("RIGHTPADDING", (0, 0), (-1, -1), 4), | |
| ("TOPPADDING", (0, 0), (-1, -1), 3), | |
| ("BOTTOMPADDING", (0, 0), (-1, -1), 3), | |
| ])) | |
| story.append(tbl) | |
| _build_pdf_footer(story, styles) | |
| doc.build(story) | |
| logger.info(f"Rich PDF generated: {filepath}") | |
| return filepath | |
| except Exception as e: | |
| logger.error(f"Rich PDF generation failed: {e}", exc_info=True) | |
| return None | |
| # ───────────────────────────────────────────── | |
| # 11. Firebase Profile Helpers | |
| # ───────────────────────────────────────────── | |
| def get_or_create_profile(mobile: str) -> Dict[str, Any]: | |
| if not db: | |
| return {} | |
| try: | |
| ref = db.collection("pricelyst_profiles").document(mobile) | |
| doc = ref.get() | |
| if doc.exists: | |
| return doc.to_dict() | |
| profile = {"mobile": mobile, "created_at": datetime.now(timezone.utc).isoformat()} | |
| ref.set(profile) | |
| return profile | |
| except Exception as e: | |
| logger.error(f"Profile fetch error for {mobile}: {e}") | |
| return {} | |
| def get_chat_history(mobile: str, limit: int = 6) -> str: | |
| if not db: | |
| return "" | |
| try: | |
| docs = ( | |
| db.collection("pricelyst_profiles").document(mobile) | |
| .collection("chat_logs") | |
| .order_by("ts", direction=firestore.Query.DESCENDING) | |
| .limit(limit) | |
| .stream() | |
| ) | |
| msgs = [] | |
| for d in docs: | |
| data = d.to_dict() | |
| msgs.append(f"User: {data.get('message', '')}\nApril: {data.get('response', '')}") | |
| return "\n".join(reversed(msgs)) | |
| except Exception as e: | |
| logger.error(f"Chat history error: {e}") | |
| return "" | |
| def save_chat_log(mobile: str, message: str, response: str, intent: Dict) -> None: | |
| if not db: | |
| return | |
| try: | |
| db.collection("pricelyst_profiles").document(mobile).collection("chat_logs").add({ | |
| "message": message, | |
| "response": response, | |
| "intent": intent, | |
| "ts": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| except Exception as e: | |
| logger.error(f"Chat log save error: {e}") | |
| def save_shopping_plan(mobile: str, plan: Dict) -> Optional[str]: | |
| if not db: | |
| return None | |
| try: | |
| ref = (db.collection("pricelyst_profiles").document(mobile) | |
| .collection("shopping_plans").document()) | |
| plan["id"] = ref.id | |
| ref.set(plan) | |
| return ref.id | |
| except Exception as e: | |
| logger.error(f"Plan save error: {e}") | |
| return None | |
| def save_feedback(mobile: str, feedback_text: str, | |
| feedback_type: str = "general") -> bool: | |
| """ | |
| Save user feedback to a top-level Firestore collection for easy review. | |
| feedback_type: "product_request" | "price_issue" | "suggestion" | "general" | |
| """ | |
| if not db: | |
| return False | |
| try: | |
| db.collection("pricelyst_feedback").add({ | |
| "mobile": mobile, | |
| "feedback": feedback_text, | |
| "type": feedback_type, | |
| "ts": datetime.now(timezone.utc).isoformat(), | |
| "status": "new", # new | reviewed | actioned | |
| }) | |
| # Also log against the user's profile for context | |
| db.collection("pricelyst_profiles").document(mobile).collection("feedback").add({ | |
| "feedback": feedback_text, | |
| "type": feedback_type, | |
| "ts": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| logger.info(f"Feedback saved from {mobile}: [{feedback_type}] {feedback_text[:80]}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Feedback save error: {e}") | |
| return False | |
| # ───────────────────────────────────────────── | |
| # 12. Firebase Storage & Media Helpers | |
| # ───────────────────────────────────────────── | |
| def upload_to_firebase_storage(file_path: str, folder: str = "catalogues") -> Optional[str]: | |
| if not FIREBASE_STORAGE_BUCKET: | |
| return None | |
| try: | |
| bucket = fb_storage.bucket() | |
| blob = bucket.blob(f"{folder}/{os.path.basename(file_path)}") | |
| blob.upload_from_filename(file_path) | |
| url = blob.generate_signed_url(expiration=timedelta(hours=1)) | |
| return url | |
| except Exception as e: | |
| logger.error(f"Firebase Storage upload failed: {e}") | |
| return None | |
| def upload_to_imgur(file_path: str) -> Optional[str]: | |
| if not IMGUR_CLIENT_ID: | |
| return None | |
| try: | |
| with open(file_path, "rb") as f: | |
| resp = requests.post(IMGUR_URL, headers=IMGUR_HEADERS, files={"image": f}) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| return data["data"]["link"] if data.get("success") else None | |
| except Exception as e: | |
| logger.error(f"Imgur upload failed: {e}") | |
| return None | |
| def deepgram_tts(text: str) -> Optional[str]: | |
| if not DEEPGRAM_API_KEY: | |
| return None | |
| try: | |
| resp = requests.post( | |
| DEEPGRAM_TTS_URL, | |
| headers={"Authorization": f"Token {DEEPGRAM_API_KEY}", | |
| "Content-Type": "application/json"}, | |
| json={"text": text}, timeout=30, | |
| ) | |
| resp.raise_for_status() | |
| fp = os.path.join(os.getcwd(), f"tts_{uuid.uuid4().hex}.mp3") | |
| with open(fp, "wb") as f: | |
| f.write(resp.content) | |
| return fp | |
| except Exception as e: | |
| logger.error(f"DeepGram TTS failed: {e}") | |
| return None | |
| # ───────────────────────────────────────────── | |
| # 13. Deals & Discovery Helpers | |
| # ───────────────────────────────────────────── | |
| def get_todays_deals(limit: int = 8) -> List[Dict]: | |
| df = get_market_index() | |
| if df.empty: | |
| return [] | |
| try: | |
| offers = df[df["is_offer"] == True].copy() | |
| if offers.empty: | |
| return [] | |
| price_range = offers.groupby("product_name")["price"].agg(["min", "max"]).reset_index() | |
| price_range["savings"] = price_range["max"] - price_range["min"] | |
| top = (price_range[price_range["savings"] > 0.05] | |
| .sort_values("savings", ascending=False).head(limit)) | |
| deals = [] | |
| for _, row in top.iterrows(): | |
| cheapest = offers[offers["product_name"] == row["product_name"]].sort_values("price").iloc[0] | |
| deals.append({ | |
| "product_name": row["product_name"], | |
| "cheapest_price": float(cheapest["price"]), | |
| "retailer": cheapest["retailer"], | |
| "savings": float(row["savings"]), | |
| "category": cheapest.get("category", ""), | |
| }) | |
| return deals | |
| except Exception as e: | |
| logger.error(f"Deals fetch error: {e}") | |
| return [] | |
| def get_category_list() -> List[str]: | |
| df = get_market_index() | |
| if df.empty: | |
| return [] | |
| try: | |
| return sorted(df["category"].dropna().unique().tolist()) | |
| except Exception: | |
| return [] | |
| def format_deals_message(deals: List[Dict]) -> str: | |
| if not deals: | |
| return "No deals data right now. Please try again shortly." | |
| lines = ["🏷️ *Today\'s Best Deals* 🇿🇼\n"] | |
| for i, d in enumerate(deals, 1): | |
| lines.append(f"*{i}. {d['product_name']}*") | |
| lines.append(f" 💰 ${d['cheapest_price']:.2f} @ {d['retailer']}") | |
| lines.append(f" 🔥 Save up to ${d['savings']:.2f}") | |
| lines.append("") | |
| lines.append("_Type any product name to compare prices!_") | |
| lines.append("") | |
| lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*") | |
| lines.append("www.pricelyst.co.zw") | |
| return "\n".join(lines) |