PriceLystAI-TW / utility.py
rairo's picture
Update utility.py
346a36a verified
"""
utility.py — Pricelyst WhatsApp Bot (v2)
Core AI & Data layer.
Upgrades in this version:
1. Smart message formatting — per-store breakdown for ≤3 items,
cheapest-basket summary with highlights for larger baskets.
2. Budget Engine — real $ calculations against live catalogue prices.
Handles: fixed budget shopping, monthly household budgeting,
party/event planning with headcount, meal prep with recipes,
image-to-recipe-to-shopping-list pipeline.
"""
import os
import re
import json
import time
import math
import uuid
import logging
import base64
import io
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional, Tuple
import requests
import pandas as pd
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────
# 1. Gemini (new google-genai SDK)
# ─────────────────────────────────────────────
try:
from google import genai
from google.genai import types as genai_types
except ImportError:
genai = None
genai_types = None
logger.error("google-genai not installed. Run: pip install google-genai")
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite")
_gemini_client = None
if genai and GOOGLE_API_KEY:
try:
_gemini_client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Gemini client ready (model=%s).", GEMINI_MODEL)
except Exception as e:
logger.error("Failed to init Gemini client: %s", e)
# ─────────────────────────────────────────────
# 2. Firebase
# ─────────────────────────────────────────────
import firebase_admin
from firebase_admin import credentials, firestore, storage as fb_storage
FIREBASE_ENV = os.environ.get("FIREBASE", "")
FIREBASE_STORAGE_BUCKET = os.environ.get("FIREBASE_STORAGE_BUCKET", "")
db: Optional[Any] = None
def _get_firestore_client() -> Optional[Any]:
db_name = os.environ.get("FIRESTORE_DB_NAME", "(default)")
return firestore.client(database_id=db_name)
def init_firestore_from_env() -> Optional[Any]:
global db
try:
if firebase_admin._apps:
db = _get_firestore_client()
return db
if not FIREBASE_ENV:
logger.warning("FIREBASE env var missing. Persistence disabled.")
return None
sa_info = json.loads(FIREBASE_ENV)
cred = credentials.Certificate(sa_info)
init_opts = {}
if FIREBASE_STORAGE_BUCKET:
init_opts["storageBucket"] = FIREBASE_STORAGE_BUCKET
firebase_admin.initialize_app(cred, init_opts)
db = _get_firestore_client()
logger.info("Firebase initialized.")
return db
except Exception as e:
logger.critical("Failed to initialize Firebase: %s", e, exc_info=True)
return None
db = init_firestore_from_env()
# ─────────────────────────────────────────────
# 3. Static Config
# ─────────────────────────────────────────────
PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/")
HTTP_TIMEOUT = 30
PRODUCT_CACHE_TTL = 60 * 20 # 20 minutes
# ZESA/ZETDC tariffs — ZERA approved, USD equivalent incl. 6% REA levy
# Source: zimpricecheck.com, last updated 20 May 2026
# Billing is in ZiG; USD equivalents at prevailing interbank rate.
# Domestic users billed in ZiG — USD estimates for user convenience.
ZIM_CONTEXT = {
"zesa_bands": [
{"limit": 50, "rate_usd": 0.08}, # first 50 units
{"limit": 100, "rate_usd": 0.09}, # 51-100
{"limit": 200, "rate_usd": 0.16}, # 101-200
{"limit": 300, "rate_usd": 0.23}, # 201-300
{"limit": 400, "rate_usd": 0.25}, # 301-400
{"limit": 9999, "rate_usd": 0.26}, # 401+
],
"zesa_note": "Tariffs billed in ZiG; USD estimates at prevailing rate. Includes 6% REA levy.",
}
IMGUR_CLIENT_ID = os.environ.get("IMGUR_CLIENT_ID", "")
IMGUR_URL = "https://api.imgur.com/3/image"
IMGUR_HEADERS = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {}
DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "")
DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en"
CATALOGUE_DIR = os.path.join(os.getcwd(), "catalogues")
os.makedirs(CATALOGUE_DIR, exist_ok=True)
# ── Logo cache ─────────────────────────────────────────────────────────────
# Downloaded once at startup and reused for all PDFs.
# Uses requests (not urllib) so it goes through the same session that works
# on HuggingFace — urllib hits the same egress block as Meta/Cloudflare.
LOGO_URL = os.environ.get("PRICELYST_LOGO_URL", "https://i.imgur.com/4bVNlBs.jpeg")
_logo_path: Optional[str] = None # path to cached local file
def _get_logo_path() -> Optional[str]:
"""
Return a ReportLab-safe cached logo path.
Diagnosis/fix:
- Imgur can sometimes return a redirect/html error page or a JPEG variant
that ReportLab fails to decode cleanly.
- We now download with a browser-like User-Agent, verify the content is an
image, and convert it to PNG using Pillow when available. PNG is the
safest format for ReportLab.
- If Pillow is not installed, we still cache the original image bytes and
ReportLab will try to render them.
"""
global _logo_path
if _logo_path and os.path.exists(_logo_path):
return _logo_path
try:
headers = {
"User-Agent": "PricelystBot/1.0 (+https://pricelyst.co.zw)",
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
}
resp = requests.get(LOGO_URL, timeout=20, headers=headers, allow_redirects=True)
resp.raise_for_status()
content_type = (resp.headers.get("Content-Type") or "").lower()
if "image" not in content_type and not resp.content[:16].startswith((b"\xff\xd8", b"\x89PNG", b"GIF", b"RIFF")):
logger.warning(
"Logo download did not return an image. content_type=%s bytes=%s",
content_type, len(resp.content)
)
return None
png_file = os.path.join(CATALOGUE_DIR, "pricelyst_logo.png")
try:
from PIL import Image
img = Image.open(io.BytesIO(resp.content))
if img.mode not in ("RGB", "RGBA"):
img = img.convert("RGBA")
img.save(png_file, format="PNG", optimize=True)
_logo_path = png_file
logger.info("Logo cached as PNG at %s (%s bytes)", png_file, os.path.getsize(png_file))
return _logo_path
except Exception as pil_err:
# Fallback for environments without Pillow. ReportLab may still render it.
ext = ".png" if "png" in content_type else ".jpg"
raw_file = os.path.join(CATALOGUE_DIR, f"pricelyst_logo_raw{ext}")
with open(raw_file, "wb") as f:
f.write(resp.content)
_logo_path = raw_file
logger.warning("Pillow logo conversion failed (%s). Cached raw logo at %s", pil_err, raw_file)
return _logo_path
except Exception as e:
logger.warning("Logo download failed from %s: %s", LOGO_URL, e)
return None
# ─────────────────────────────────────────────
# 4. Market Index
# ─────────────────────────────────────────────
# 4. Market Index (ETL)
# ─────────────────────────────────────────────
_data_cache: Dict[str, Any] = {"ts": 0, "df": pd.DataFrame(), "raw_count": 0}
def _norm(s: Any) -> str:
return str(s).strip().lower() if s else ""
def _coerce_price(v: Any) -> float:
try:
return float(v) if v is not None else 0.0
except Exception:
return 0.0
# Alcohol/age-restricted terms are not priced in WhatsApp chat.
# We keep the rest of the user's basket alive and point restricted items to the site.
ALCOHOL_TERMS = {
"alcohol", "beer", "lager", "stout", "wine", "whisky", "whiskey",
"vodka", "gin", "rum", "brandy", "cider", "spirit", "spirits",
"booze", "liquor", "champagne", "tequila", "amarula", "castle",
"zambezi", "lion lager", "carling", "black label", "hunters",
"savanna", "heineken", "stella", "chibuku", "scud", "super",
}
def contains_alcohol_reference(text: Any) -> bool:
t = f" {_norm(text)} "
return any(f" {term} " in t or term in t for term in ALCOHOL_TERMS)
def split_restricted_items(item_names: List[str]) -> Tuple[List[str], List[str]]:
allowed, restricted = [], []
for item in item_names or []:
if contains_alcohol_reference(item):
restricted.append(str(item))
else:
allowed.append(str(item))
return allowed, restricted
def format_restricted_items_note(restricted_items: List[str]) -> str:
if not restricted_items:
return ""
return (
"🔞 Restricted item(s): " + ", ".join(restricted_items) +
" — please visit https://pricelyst.co.zw for more information."
)
def _fetch_page(url: str, page: int, per_page: int) -> Tuple[List[Dict], int]:
"""
Fetch a single page from the product listing API.
Returns (data_list, total_pages).
Raises on HTTP error so the caller can decide retry logic.
"""
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
r = requests.get(url, params={"page": page, "perPage": per_page},
timeout=HTTP_TIMEOUT, verify=False)
r.raise_for_status()
payload = r.json()
data = payload.get("data") or []
# API may return totalPages, last_page, or meta.last_page — check all
total_pages = (
payload.get("totalPages")
or payload.get("last_page")
or (payload.get("meta") or {}).get("last_page")
or 999
)
return data, int(total_pages)
def fetch_and_flatten_data() -> pd.DataFrame:
"""
Full catalogue fetch — walks EVERY page, retries transient failures,
never stops early on a single bad page.
Strategy:
- Fetch page 1 to discover total_pages
- Then fetch pages 2..N concurrently (ThreadPoolExecutor)
- Individual page failures are retried once, then logged and skipped
so one bad page never kills the entire index
- perPage=100 halves the number of round-trips vs perPage=50
"""
import urllib3, concurrent.futures
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
url = f"{PRICE_API_BASE}/api/v1/product-listing"
per_page = 100
all_raw: List[Dict] = []
# ── Page 1: discover total ───────────────────────────────────────────
try:
page1_data, total_pages = _fetch_page(url, 1, per_page)
all_raw.extend(page1_data)
logger.info(f"ETL: Page 1/{total_pages} fetched ({len(page1_data)} products)")
except Exception as e:
logger.error(f"ETL: Page 1 failed — cannot continue: {e}")
return pd.DataFrame()
if total_pages <= 1:
logger.info("ETL: Single-page catalogue.")
else:
# ── Pages 2..N: concurrent fetch ────────────────────────────────
remaining_pages = list(range(2, total_pages + 1))
def fetch_with_retry(pg: int) -> List[Dict]:
for attempt in range(2): # one retry
try:
data, _ = _fetch_page(url, pg, per_page)
logger.info(f"ETL: Page {pg}/{total_pages}{len(data)} products")
return data
except Exception as e:
if attempt == 0:
logger.warning(f"ETL: Page {pg} attempt 1 failed ({e}), retrying...")
time.sleep(0.5)
else:
logger.error(f"ETL: Page {pg} skipped after retry: {e}")
return []
# Max 8 workers — enough concurrency without hammering the API
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(fetch_with_retry, pg): pg
for pg in remaining_pages}
for future in concurrent.futures.as_completed(futures):
all_raw.extend(future.result())
logger.info(f"ETL: Total raw products fetched: {len(all_raw)}")
# ── Flatten to rows ──────────────────────────────────────────────────
rows = []
seen_ids: set = set()
for p in all_raw:
try:
p_id = int(p.get("id") or 0)
if p_id in seen_ids:
continue
seen_ids.add(p_id)
p_name = str(p.get("name") or "Unknown").strip()
brand_obj = p.get("brand") or {}
brand_name = str(brand_obj.get("brand_name") or "").strip()
cats = p.get("categories") or []
cat_names = [str(c.get("name") or "").strip() for c in cats if c.get("name")]
primary_cat = cat_names[0] if cat_names else "General"
# Rich search vector: name + brand + all categories + individual tokens
# This makes "Tanganda Tips Tea Bags 100s Pouch" findable by:
# "tanganda", "tips", "tea bags", "tea", "coffee", "milk" (category)
vector_parts = [p_name, brand_name] + cat_names
# Also add individual meaningful words from the product name
name_tokens = [w for w in p_name.lower().split()
if len(w) > 2 and w not in {"and", "the", "for", "with"}]
vector_parts.extend(name_tokens)
search_vector = _norm(" ".join(vector_parts))
views = int(p.get("view_count") or 0)
image = str(p.get("thumbnail") or p.get("image") or "")
prices = p.get("prices") or []
if not prices:
rows.append({
"product_id": p_id, "product_name": p_name,
"search_vector": search_vector, "brand": brand_name,
"category": primary_cat, "retailer": "Listing",
"price": 0.0, "views": views, "image": image, "is_offer": False,
})
continue
for offer in prices:
retailer_obj = offer.get("retailer") or {}
r_name = str(retailer_obj.get("name") or "Unknown Store").strip()
price_val = _coerce_price(offer.get("price"))
if price_val > 0:
rows.append({
"product_id": p_id, "product_name": p_name,
"search_vector": search_vector, "brand": brand_name,
"category": primary_cat, "retailer": r_name,
"price": price_val, "views": views, "image": image, "is_offer": True,
})
except Exception as row_err:
logger.warning(f"ETL: Skipped product {p.get('id','?')}: {row_err}")
continue
df = pd.DataFrame(rows)
if not df.empty:
# Build a secondary lookup index: brand -> product_ids for fast brand search
logger.info(
f"ETL: Flattened into {len(df)} rows | "
f"{df['product_id'].nunique()} unique products | "
f"{df['brand'].nunique()} brands | "
f"{df['category'].nunique()} categories"
)
return df
def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
global _data_cache
if (force_refresh or _data_cache["df"].empty
or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL)):
logger.info("ETL: Refreshing Market Index...")
df = fetch_and_flatten_data()
_data_cache.update({"df": df, "ts": time.time(), "raw_count": len(df)})
return _data_cache["df"]
# ─────────────────────────────────────────────
# 5. Precision Search & Basket Optimisation
# ─────────────────────────────────────────────
# Store alias map — common shorthand -> canonical retailer name substring
STORE_ALIASES: Dict[str, str] = {
"pnp": "pick n pay",
"p&p": "pick n pay",
"pick n pay": "pick n pay",
"picknpay": "pick n pay",
"tmpnpay": "pick n pay",
"tm": "pick n pay",
"ok": "ok",
"ok mart": "ok",
"okmart": "ok",
"spar": "spar",
"food lovers": "food lover",
"food lover": "food lover",
"foodlovers": "food lover",
"fl": "food lover",
"choppies": "choppies",
"bon marche": "bon marche",
"bon": "bon marche",
"checkers": "checkers",
"game": "game",
}
# Generic category words — trigger category-aware search rather than literal match
GENERIC_QUERIES = {
"cooking oil", "oil", "maize meal", "meal", "bread", "milk", "sugar",
"rice", "flour", "salt", "eggs", "coffee", "tea", "juice", "water",
"soap", "washing powder", "detergent", "chicken", "beef", "fish",
"butter", "margarine", "cheese", "yoghurt", "cereal", "nappies",
"diapers", "toilet paper", "tissue",
}
def normalise_store_query(store_name: str) -> Optional[str]:
"""Return canonical store substring for df filtering, or None."""
return STORE_ALIASES.get(_norm(store_name))
def search_products_deep(df: pd.DataFrame, query: str, limit: int = 50) -> pd.DataFrame:
"""
Balanced multi-signal search.
Designed to fix: "cooking oil" -> "Olive Pride Extra Virgin Olive Oil"
instead of "Zimgold Cooking Oil 2L".
Signal hierarchy:
1000 - exact product name match
600 - ALL query tokens present in vector (strict full coverage)
500 - query string is a substring of the product name
200 - brand name appears in the query
+80ea - per-token overlap (partial match)
300 - generic category bonus: product name STARTS with the category
-150 - anti-inflation penalty: generic query matched deep inside a
long fancy product name (e.g. "oil" in "Extra Virgin Olive Oil")
"""
if df.empty or not query:
return df
q_norm = _norm(query)
q_tokens = [t for t in q_norm.split() if len(t) > 1]
q_set = set(q_tokens)
is_generic = (q_norm in GENERIC_QUERIES
or (bool(q_tokens) and all(t in GENERIC_QUERIES for t in q_tokens)))
def score(row) -> int:
s = 0
p_norm = _norm(row["product_name"])
vector = row["search_vector"]
v_set = set(vector.split())
b_norm = _norm(row.get("brand", ""))
# 1. Exact full name match
if q_norm == p_norm:
return 2000
# 2. All query tokens present in vector (strict full coverage)
# e.g. ["tanganda", "tips"] both in vector → strong signal
if q_set and q_set.issubset(v_set):
s += 800
# 3. Query is a sequential substring of the product name
# "tanganda tips" in "tanganda tips tea bags 100s pouch" → very strong
if q_norm in p_norm:
s += 700
# 4. Product name starts with query (tightest positional match)
# "zimgold cooking oil" → "zimgold cooking oil 2l" starts with it
if p_norm.startswith(q_norm):
s += 400
# 5. Brand token is in query AND brand is in product name
# "tanganda" in query AND brand == "Tanganda" → brand fidelity
if b_norm and b_norm in q_norm and b_norm in p_norm:
s += 350
# 6. Brand appears in query (even without product name match)
elif b_norm and b_norm in q_norm:
s += 150
# 7. Per-token overlap score (partial matching)
overlap = len(q_set.intersection(v_set))
s += overlap * 90
# 8. Generic category bonus: product name starts with the generic term
# "Cooking Oil 2L" starts with "cooking oil" → beats fancy oil names
if is_generic and p_norm.startswith(q_norm):
s += 300
# 9. Anti-inflation: weak generic match buried in long fancy name
# e.g. query "oil" matching "Extra Virgin Cold-Pressed Avocado Oil"
if s < 150 and is_generic and len(p_norm.split()) > 4:
s = max(0, s - 200)
return s
df_scored = df.copy()
df_scored["match_score"] = df_scored.apply(score, axis=1)
matches = df_scored[df_scored["match_score"] > 0]
if matches.empty:
return matches
return matches.sort_values(
["match_score", "views", "price"], ascending=[False, False, True]
).head(limit)
def calculate_basket_optimization(item_names: List[str],
preferred_retailer: Optional[str] = None) -> Dict[str, Any]:
# Keep age-restricted/alcohol mentions out of WhatsApp pricing while still
# processing the rest of the basket. Example: milk + Zambezi + sugar →
# price milk/sugar, point Zambezi to the site.
clean_items, restricted_items = split_restricted_items(item_names or [])
df = get_market_index()
if df.empty:
return {"actionable": False, "error": "Market data unavailable. Please try again shortly."}
if restricted_items and not clean_items:
return {
"actionable": True,
"found_items": [],
"global_missing": [],
"restricted_items": restricted_items,
"market_matrix": [],
"best_store": None,
"preferred_retailer": preferred_retailer,
}
item_names = clean_items
# Normalise store alias (e.g. "TmPnPay" -> "pick n pay")
store_filter: Optional[str] = None
if preferred_retailer:
store_filter = normalise_store_query(preferred_retailer)
if not store_filter:
store_filter = _norm(preferred_retailer) # use raw normalised if not in alias map
found_items = []
missing_global = []
for item in item_names:
hits = search_products_deep(df[df["is_offer"] == True], item, limit=50)
if hits.empty:
missing_global.append(item)
continue
best_match = hits.iloc[0]
q_norm = _norm(item)
res_norm = _norm(f"{best_match['product_name']} {best_match['brand']}")
q_tokens = q_norm.split()
is_sub = (len(q_tokens) > 1
and sum(1 for t in q_tokens if t in res_norm) < len(q_tokens))
product_offers = (hits[hits["product_name"] == best_match["product_name"]]
.sort_values("price"))
offers_list = [{"retailer": r["retailer"], "price": float(r["price"])}
for _, r in product_offers.iterrows()]
best_price = offers_list[0]["price"]
max_price = offers_list[-1]["price"]
found_items.append({
"query": item,
"product_name": str(best_match["product_name"]),
"brand": str(best_match["brand"]),
"category": str(best_match["category"]),
"image": str(best_match["image"]),
"is_substitute": is_sub,
"offers": offers_list,
"best_price": best_price,
"potential_savings": max_price - best_price,
})
if not found_items:
return {
"actionable": True,
"found_items": [],
"global_missing": missing_global,
"restricted_items": restricted_items,
}
# Market Matrix — if a store filter is set, show that store first in comparison
all_retailers = set(o["retailer"] for f in found_items for o in f["offers"])
store_comparison = []
for retailer in all_retailers:
total_price = 0.0
found_count = 0
missing_list = []
for item in found_items:
price = next((o["price"] for o in item["offers"] if o["retailer"] == retailer), None)
if price:
total_price += price
found_count += 1
else:
missing_list.append(item["product_name"])
store_comparison.append({
"retailer": retailer,
"total_price": total_price,
"found_count": found_count,
"total_items": len(found_items),
"missing_items": missing_list,
})
store_comparison.sort(key=lambda x: (-x["found_count"], x["total_price"]))
if len(store_comparison) > 1:
max_total = max(s["total_price"] for s in store_comparison
if s["found_count"] == store_comparison[0]["found_count"])
for s in store_comparison:
s["basket_savings"] = (
max_total - s["total_price"]
if s["found_count"] == store_comparison[0]["found_count"] else 0.0
)
else:
for s in store_comparison:
s["basket_savings"] = 0.0
# If a specific store was requested, pin it and its comparison to the front
if store_filter:
pinned = [s for s in store_comparison if store_filter in _norm(s["retailer"])]
rest = [s for s in store_comparison if store_filter not in _norm(s["retailer"])]
store_comparison = pinned + rest
return {
"actionable": True,
"is_basket": len(found_items) > 1,
"found_items": found_items,
"global_missing": missing_global,
"restricted_items": restricted_items,
"market_matrix": store_comparison[:5],
"best_store": store_comparison[0] if store_comparison else None,
"preferred_retailer": preferred_retailer,
"store_filter": store_filter,
}
# ─────────────────────────────────────────────
# 6. ZESA Calculator
# ─────────────────────────────────────────────
def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]:
"""
Calculate ZESA units for a USD amount using the actual 2026 ZETDC
stepped tariff bands (ZiG-billed, USD equivalent incl. 6% REA levy).
Source: zimpricecheck.com / ZERA approved tariffs, May 2026.
Band structure (USD per unit, incl. 6% Rural Electrification Levy):
0-50 units: $0.08/unit
51-100: $0.09/unit
101-200: $0.16/unit
201-300: $0.23/unit
301-400: $0.25/unit
401+: $0.26/unit
"""
bands = ZIM_CONTEXT["zesa_bands"]
remaining = float(amount_usd)
units = 0.0
prev_limit = 0
band_breakdown = []
for band in bands:
band_size = band["limit"] - prev_limit
band_cost = band_size * band["rate_usd"]
if remaining <= 0:
break
if remaining >= band_cost:
units += band_size
remaining -= band_cost
band_breakdown.append({
"band": f"{prev_limit+1}-{band['limit']}",
"units": band_size,
"rate": band["rate_usd"],
})
else:
partial = remaining / band["rate_usd"]
units += partial
band_breakdown.append({
"band": f"{prev_limit+1}-{band['limit']}",
"units": round(partial, 1),
"rate": band["rate_usd"],
})
remaining = 0
prev_limit = band["limit"]
return {
"amount_usd": float(amount_usd),
"est_units_kwh": float(round(units, 1)),
"band_breakdown": band_breakdown,
"note": ZIM_CONTEXT["zesa_note"],
}
# ─────────────────────────────────────────────
# 7. Smart WhatsApp Formatter (Upgrade 1)
# ─────────────────────────────────────────────
def format_basket_for_whatsapp(analyst: Dict, language: str = "English") -> str:
"""
Smart formatter:
- ≤3 items → full per-store price breakdown for each item
- >3 items → cheapest basket total + store comparison + highlights
Spaced for mobile readability. Concise but complete.
"""
if not analyst.get("actionable"):
return analyst.get("error", "Sorry, I couldn't fetch price data right now.")
found = analyst.get("found_items", [])
missing = analyst.get("global_missing", [])
restricted = analyst.get("restricted_items", [])
matrix = analyst.get("market_matrix", [])
lines = []
if not found:
note = format_restricted_items_note(restricted)
if missing:
base = (f"⚠️ Couldn't find: {', '.join(missing)}\n"
f"_Try a shorter product name._")
return base + ("\n\n" + note if note else "")
if note:
return note
return "No results found. Try a different search term."
n = len(found)
# ── ≤ 3 items: full per-store breakdown ──────────────────────────────
if n <= 3:
for item in found:
sub = " _(nearest match)_" if item.get("is_substitute") else ""
lines.append(f"🏷️ *{item['product_name']}*{sub}")
for o in item["offers"][:4]:
is_best = o["price"] == item["best_price"]
tick = "✅" if is_best else "▪️"
lines.append(f" {tick} {o['retailer']}: *${o['price']:.2f}*")
savings = item.get("potential_savings", 0)
if savings > 0.10:
lines.append(f" 💡 Save *${savings:.2f}* at cheapest store")
lines.append("")
if n > 1 and matrix:
best = matrix[0]
lines.append(f"🏪 Best basket: *{best['retailer']}* — *${best['total_price']:.2f}*")
sv = best.get("basket_savings", 0)
if sv > 0.10:
lines.append(f"💰 Saves *${sv:.2f}* vs most expensive option")
# ── > 3 items: basket summary + highlights ────────────────────────────
else:
lines.append(f"🛒 *{n}-item basket*\n")
if matrix:
for s in matrix[:4]:
cover = f"{s['found_count']}/{s['total_items']} items"
marker = "✅" if s == matrix[0] else "▪️"
lines.append(f" {marker} *{s['retailer']}*: *${s['total_price']:.2f}* ({cover})")
lines.append("")
sv = matrix[0].get("basket_savings", 0)
if sv > 0.10:
lines.append(f"💰 *{matrix[0]['retailer']}* saves you *${sv:.2f}* on this basket")
lines.append("")
big = sorted(found, key=lambda x: x.get("potential_savings", 0), reverse=True)
hot = [x for x in big if x.get("potential_savings", 0) > 0.20][:3]
if hot:
lines.append("🔥 *Biggest savings:*")
for item in hot:
lines.append(
f" • *{item['product_name']}* "
f"${item['best_price']:.2f} @ {item['offers'][0]['retailer']} "
f"(save ${item['potential_savings']:.2f})"
)
lines.append("")
subs = [f for f in found if f.get("is_substitute")]
if subs:
lines.append("⚠️ *Nearest matches:*")
for s in subs:
lines.append(f" _{s['query']}_ → {s['product_name']} (${s['best_price']:.2f})")
lines.append("")
if missing:
lines.append(f"❓ Not found: {', '.join(missing)}")
lines.append("_Try a simpler name or check in-store._")
if restricted:
lines.append(format_restricted_items_note(restricted))
lines.append("")
lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*")
lines.append("www.pricelyst.co.zw")
return "\n".join(lines)
# ─────────────────────────────────────────────
# 8. Budget & Meal Planning Engine (Upgrade 2)
# ─────────────────────────────────────────────
def gemini_plan_budget(transcript: str, budget: float, context: str,
catalogue_snapshot: str, language: str = "English") -> str:
"""
Full AI budget planner using real catalogue prices.
Handles: fixed-amount shopping, monthly household, party/event, meal prep.
"""
if not _gemini_client:
return "Budget planning is temporarily unavailable. Please try again."
PROMPT = f"""
You are Pricelyst AI, Pricelyst Zimbabwe's AI Shopping & Budget Advisor.
You have access to REAL current prices from Zimbabwe's top supermarkets.
USER REQUEST: "{transcript}"
DETECTED BUDGET: ${budget:.2f} USD
CONTEXT CLUES: {context}
PRICELYST CATALOGUE — FULL PRODUCT INDEX (use these first, estimate only if absent):
{catalogue_snapshot}
CRITICAL: Walk the entire catalogue above before making any price estimate.
For every item you include, check if it exists in the catalogue first.
Only use "(est. ~$X)" for items with ZERO catalogue matches.
Do not price alcohol or age-restricted products in WhatsApp; for those items say: please visit https://pricelyst.co.zw for more information.
Keep the response concise and practical.
ZIMBABWE CONTEXT:
- Average family of 4 monthly food spend: $150-$250
- ZESA: $10 ≈ 108 units, $20 ≈ 210 units (2026 tariff)
- ZESA: bands $0.08-$0.26/unit depending on consumption level
YOUR TASK — respond based on what the user asked:
1. FIXED BUDGET SHOPPING ("I have $50"):
- Build an optimal shopping list that maximises value within budget
- Show exact items, quantities, store, and prices
- Show running total as you add items
- Show remaining balance after each category
- Recommend the single best store for this basket
2. MONTHLY HOUSEHOLD BUDGET ("$2000 for a month, family of 4"):
- Break down the budget into categories: Food, Household, Transport, ZESA, Contingency
- For food: build a full monthly shopping list with real prices
- Calculate cost per person per day
- Show weekly vs monthly shopping rhythm recommendation
- Flag where they can save most
3. EVENT/PARTY PLANNING ("dinner for 10", "party for 100 people"):
- Scale ingredients appropriately for the headcount
- Build a full shopping list with quantities scaled to guests
- Show per-head cost
- Recommend best store combination for the event
- Include drinks, condiments, serviettes etc. from catalogue
4. MEAL PREP PLANNING ("weekly meal prep", "5 days of lunches"):
- Suggest 5-7 practical Zimbabwean meals using real catalogue products
- Show shopping list with prices for the full week
- Calculate cost per meal and cost per serving
- Give simple prep tips
FORMAT RULES (WhatsApp — no markdown headers ##):
- Keep it concise: maximum 6 short sections, avoid long explanations
- Use *bold* for totals, store names, key numbers
- Use emojis naturally 💰 🛒 📊 ✅ 🍽️
- Structure with clear sections separated by blank lines
- Be specific with numbers — show your working
- Always end with: total cost, per-person cost, and best store recommendation
- If budget is tight, flag it honestly and suggest stretching strategies
Respond in {language}. Be a genuinely useful financial advisor, not just a list generator.
"""
try:
resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT)
return sanitise_response(resp.text)
except Exception as e:
logger.error(f"Budget planner error: {e}")
return "I had trouble calculating your budget plan. Please try again!"
def gemini_generate_recipe(
meal_name: str,
found_items: List[Dict],
servings: int = 4,
language: str = "English") -> str:
"""
Generate a full recipe using real Pricelyst catalogue products.
Format: ingredients+prices, total cost, steps, serving info.
Estimates only for items absent from the catalogue.
"""
if not _gemini_client:
return "Recipe generation is temporarily unavailable."
products_str = "\n".join(
f"- {item['product_name']} (${item['best_price']:.2f} @ {item['offers'][0]['retailer']})"
for item in found_items
) if found_items else "No catalogue matches — use estimated prices for all ingredients."
PROMPT = f"""
You are Pricelyst AI, Zimbabwe\'s smart shopping and meal planning assistant.
Generate a complete, practical recipe for: *{meal_name}*
Servings: {servings} people
PRICELYST CATALOGUE PRODUCTS (walk these FIRST — real prices, real stores):
{products_str}
OUTPUT FORMAT (WhatsApp plain text — follow this order exactly):
*🍽️ {meal_name}* | Serves {servings}
*🛒 Ingredients & Prices:*
• [Ingredient] [qty] — *$[price]* @ [store]
(For each item: use catalogue price if found, else "(est. ~$X)")
*💰 Total:* $X.XX | Per serving: $X.XX
Best store: [store name covering most items]
*⏱️ Steps:*
1. [step]
2. [step]
(Max 8 steps, practical for a Zimbabwean home kitchen)
*💡 Tips:* [1 storage or substitution tip using catalogue alternatives]
*For more recipes & prices: pricelyst.co.zw*
RULES:
- Walk the ENTIRE catalogue before estimating any price
- Only mark "(est. ~$X)" if genuinely absent from catalogue
- Realistic Zimbabwean home quantities, not restaurant portions
- *bold* prices and totals only, plain text elsewhere
- NO image URLs, NO external links except pricelyst.co.zw
Respond in {language}.
"""
try:
resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT)
return sanitise_response(resp.text)
except Exception as e:
logger.error(f"Recipe generation error: {e}")
return "I had trouble generating the recipe. Please try again!"
def gemini_analyze_meal_image_to_recipe(
image_bytes: bytes,
caption: str,
found_items: List[Dict],
language: str = "English") -> str:
"""
Full pipeline: image of a meal → identify dish → recipe → shopping list with prices.
"""
if not _gemini_client:
return "Image recipe analysis is temporarily unavailable."
products_str = "\n".join(
f"- {item['product_name']} (${item['best_price']:.2f} @ {item['offers'][0]['retailer']})"
for item in found_items
) if found_items else "No catalogue matches found — use estimated prices."
PROMPT = f"""
You are Pricelyst AI, Pricelyst Zimbabwe's AI Meal & Shopping Advisor.
A user sent a photo of a meal/dish. Your job: identify it, then immediately provide
a full recipe with a Zimbabwe-priced shopping list.
USER CAPTION: "{caption}"
AVAILABLE CATALOGUE PRODUCTS:
{products_str}
DO THIS IN ONE RESPONSE:
1. 🍽️ *Identify the dish* — name it confidently
2. 📝 *Full recipe* — ingredients with quantities for 4 servings
3. 🛒 *Shopping list* — match ingredients to catalogue products with prices
4. 💰 *Total cost* — exact calculation, cost per serving, best store
5. ⏱️ *Quick cook guide* — 5-6 simple steps
Keep it warm, practical, Zimbabwean. Format for WhatsApp (*bold*, emojis).
Respond in {language}.
"""
try:
image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[PROMPT, image_part],
)
return sanitise_response(resp.text)
except Exception as e:
logger.error(f"Meal image recipe error: {e}")
return "I had trouble analyzing that meal photo. Please try again!"
def build_catalogue_snapshot() -> str:
"""
Build a COMPLETE category-organised price snapshot for Gemini prompts.
No limits — every product in the index is included so Gemini sees
the full catalogue when generating recipes, budgets and meal plans.
Niche products are equally represented alongside popular ones.
Within each category: sorted by views desc then price asc so the
most contextually useful products appear first in token budget.
"""
df = get_market_index()
if df.empty:
return "Catalogue unavailable."
try:
offers = df[df["is_offer"] == True].copy()
if offers.empty:
return "No priced products available."
# Best price per product across all stores — keep brand for context
best_prices = (
offers.groupby(["product_name", "category", "brand"])
.agg(min_price=("price", "min"),
retailer=("retailer", "first"),
views=("views", "max"))
.reset_index()
)
lines = []
total_products = 0
# All categories, sorted alphabetically for consistency
categories = sorted(best_prices["category"].unique())
for cat in categories:
cat_df = (best_prices[best_prices["category"] == cat]
.sort_values(["views", "min_price"],
ascending=[False, True]))
if cat_df.empty:
continue
lines.append(f"\n[{cat}]")
for _, row in cat_df.iterrows():
brand_str = f" ({row['brand']})" if row["brand"] else ""
lines.append(
f" {row['product_name']}{brand_str}: "
f"${row['min_price']:.2f} @ {row['retailer']}"
)
total_products += 1
logger.info(
f"Catalogue snapshot: {total_products} products "
f"across {len(categories)} categories (no limits applied)"
)
return "\n".join(lines)
except Exception as e:
logger.error(f"Catalogue snapshot error: {e}")
return "Catalogue snapshot unavailable."
def get_products_by_category(category_keyword: str, limit: int = 20) -> List[Dict]:
"""
Return all products matching a category keyword — used by recipe/budget
engines to surface the full range of available options in a category.
"""
df = get_market_index()
if df.empty:
return []
try:
kw = _norm(category_keyword)
offers = df[df["is_offer"] == True].copy()
matches = offers[offers["category"].str.lower().str.contains(kw, na=False)
| offers["search_vector"].str.contains(kw, na=False)]
if matches.empty:
return []
# Best price per product in this category
best = (matches.groupby("product_name")
.agg(best_price=("price", "min"),
retailer=("retailer", "first"),
brand=("brand", "first"),
category=("category", "first"))
.reset_index()
.sort_values("best_price"))
return best.head(limit).to_dict("records")
except Exception as e:
logger.error(f"get_products_by_category error: {e}")
return []
def extract_budget_from_text(transcript: str) -> Tuple[Optional[float], str]:
"""
Pull a dollar amount and context from free text.
Returns (amount, context_description).
Examples:
"I have $50" → (50.0, "fixed_budget")
"$2000 for the month" → (2000.0, "monthly_household")
"party for 100 people" → (None, "event_100_people")
"dinner for 5" → (None, "event_5_people")
"""
# Find dollar amount
amount_match = re.search(r'\$\s*([\d,]+(?:\.\d{1,2})?)', transcript)
if not amount_match:
amount_match = re.search(r'([\d,]+(?:\.\d{1,2})?)\s*(?:dollars?|usd)', transcript, re.I)
amount = None
if amount_match:
try:
amount = float(amount_match.group(1).replace(",", ""))
except ValueError:
pass
# Determine context
t = transcript.lower()
if any(w in t for w in ["month", "monthly", "per month"]):
context = "monthly_household"
elif any(w in t for w in ["party", "event", "wedding", "function", "people", "guests", "pax"]):
people_match = re.search(r'(\d+)\s*(?:people|guests|pax|persons?)', t)
guests = people_match.group(1) if people_match else "unknown"
context = f"event_{guests}_people"
elif any(w in t for w in ["week", "weekly", "meal prep", "prep"]):
context = "weekly_meal_prep"
elif any(w in t for w in ["recipe", "cook", "make", "prepare", "how to"]):
context = "recipe_request"
elif amount:
context = "fixed_budget"
else:
context = "general_planning"
return amount, context
return amount, context
# ─────────────────────────────────────────────
# 8b. Conversation Context Engine
# ─────────────────────────────────────────────
def apply_context_mutation(
current_basket: List[str],
current_found: List[Dict],
mutation: Dict) -> Tuple[List[str], str]:
"""
Apply a context mutation to an active basket.
Returns (new_item_list, description_of_change).
Mutations understood:
add_items : ["milk", "bread"]
remove_items : ["coffee"] — matched fuzzily
remove_most_expensive : True
remove_cheapest : True
what_if_add : ["milk"] — compute total but don't persist
"""
action = mutation.get("action", "")
target_items = [_norm(x) for x in mutation.get("items", [])]
desc = ""
if action == "add_items":
new_items = list(current_basket) + mutation.get("items", [])
desc = f"Added: {', '.join(mutation.get('items', []))}"
return new_items, desc
if action == "remove_items":
removed = []
new_basket = []
for orig in current_basket:
# Match against both original query and resolved product name
orig_norm = _norm(orig)
match = any(
t in orig_norm or orig_norm in t
for t in target_items
)
if match:
removed.append(orig)
else:
new_basket.append(orig)
desc = f"Removed: {', '.join(removed)}" if removed else "Item not found in current basket"
return new_basket, desc
if action == "remove_most_expensive":
if not current_found:
return current_basket, "No active basket to modify"
most_exp = max(current_found, key=lambda x: x.get("best_price", 0))
new_basket = [x for x in current_basket
if _norm(x) not in _norm(most_exp["product_name"])
and _norm(most_exp["product_name"]) not in _norm(x)]
desc = f"Removed most expensive: {most_exp['product_name']} (${most_exp['best_price']:.2f})"
return new_basket, desc
if action == "remove_cheapest":
if not current_found:
return current_basket, "No active basket to modify"
cheapest = min(current_found, key=lambda x: x.get("best_price", 0))
new_basket = [x for x in current_basket
if _norm(x) not in _norm(cheapest["product_name"])
and _norm(cheapest["product_name"]) not in _norm(x)]
desc = f"Removed cheapest: {cheapest['product_name']} (${cheapest['best_price']:.2f})"
return new_basket, desc
return current_basket, "No change"
def gemini_resolve_context_intent(message: str, active_basket: List[str],
active_found: List[Dict]) -> Dict[str, Any]:
"""
Determine if a message is a context mutation on the active basket,
or a fresh new query.
Returns a dict:
{
"is_context": bool,
"action": "add_items|remove_items|remove_most_expensive|remove_cheapest|
what_if_add|show_total|fresh_query",
"items": ["item1"], # for add/remove/what_if
"explanation": "str"
}
"""
if not _gemini_client or not active_basket:
return {"is_context": False, "action": "fresh_query", "items": []}
basket_str = ", ".join(active_basket)
found_str = "\n".join(
f"- {f['product_name']}: ${f['best_price']:.2f}"
for f in (active_found or [])
)
PROMPT = f"""
You are a shopping assistant context resolver.
The user has an ACTIVE BASKET: [{basket_str}]
Resolved products and prices:
{found_str}
New user message: "{message}"
Decide: is this message a MODIFICATION of the active basket, or a FRESH NEW QUERY?
Context modifications include:
- "remove coffee" / "take out the bread" / "remove the most expensive" → remove_items / remove_most_expensive
- "add milk" / "what if I add eggs" / "include sugar" → add_items / what_if_add
- "remove the cheapest" / "drop the cheapest item" → remove_cheapest
- "what's the total now" / "show total" → show_total
- "let's see" / "ok" / "sounds good" → show_total (treat as confirmation request)
Fresh new queries include:
- Completely new products unrelated to the basket ("price of diapers")
- New categories / topics ("deals today", "ZESA units")
- Greetings or topic changes
Return STRICT JSON:
{{
"is_context": boolean,
"action": "add_items|remove_items|remove_most_expensive|remove_cheapest|what_if_add|show_total|fresh_query",
"items": ["item to add or remove"],
"explanation": "one line reason"
}}
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT,
config=genai_types.GenerateContentConfig(response_mime_type="application/json")
)
result = _safe_json_loads(resp.text, {"is_context": False, "action": "fresh_query", "items": []})
logger.info(f"Context resolution: {result}")
return result
except Exception as e:
logger.error(f"Context resolution error: {e}")
return {"is_context": False, "action": "fresh_query", "items": []}
def format_context_result(
action: str,
change_desc: str,
analyst: Dict,
what_if: bool = False) -> str:
"""Format a context mutation result as a WhatsApp message."""
lines = []
if action == "show_total" or not change_desc:
lines.append("🛒 *Current Basket*\n")
elif what_if:
lines.append(f"🔮 *What if scenario*\n_{change_desc}_\n")
else:
lines.append(f"✅ *Basket updated*\n_{change_desc}_\n")
found = analyst.get("found_items", [])
matrix = analyst.get("market_matrix", [])
missing = analyst.get("global_missing", [])
if found:
for item in found:
sub = " _(nearest match)_" if item.get("is_substitute") else ""
lines.append(f"• *{item['product_name']}*{sub} — ${item['best_price']:.2f}")
lines.append("")
if matrix:
best = matrix[0]
lines.append(f"💰 *Best basket total: ${best['total_price']:.2f}* @ {best['retailer']}")
savings = best.get("basket_savings", 0)
if savings > 0.10:
lines.append(f" Saves *${savings:.2f}* vs most expensive option")
lines.append("")
if len(matrix) > 1:
lines.append("Other stores:")
for s in matrix[1:3]:
lines.append(f" {s['retailer']}: ${s['total_price']:.2f}")
if missing:
lines.append(f"\n⚠️ Not found: {', '.join(missing)}")
lines.append("")
lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*")
lines.append("www.pricelyst.co.zw")
return "\n".join(lines)
# ─────────────────────────────────────────────
# 9. Gemini Helpers
# ─────────────────────────────────────────────
def _safe_json_loads(s: str, fallback: Any) -> Any:
try:
cleaned = s
if "```json" in cleaned:
cleaned = cleaned.split("```json")[1].split("```")[0]
elif "```" in cleaned:
cleaned = cleaned.split("```")[1]
return json.loads(cleaned.strip())
except Exception as e:
logger.error(f"JSON parse error: {e} | raw: {s[:300]}")
return fallback
def gemini_detect_intent(transcript: str) -> Dict[str, Any]:
"""Classify intent including new BUDGET_PLANNER and RECIPE_REQUEST intents."""
if not _gemini_client:
return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []}
PROMPT = """
Analyze the transcript below and return STRICT JSON.
Intents:
- CASUAL_CHAT : Greetings, "hi", off-topic
- SHOPPING_BASKET : Searching for prices / cheapest X
- UTILITY_CALC : Electricity / ZESA / fuel cost questions
- STORE_DECISION : "Which store is cheapest?", "Where should I shop?"
- EVENT_PLANNING : Implicit lists — "plan a braai", "wedding grocery list", "dinner for 5"
- CATALOGUE_REQUEST : User wants a PDF price list / catalogue / deals sheet
- DEALS_EXPLORE : "Today's deals", "promotions", "what's on special"
- DISCOVER : "What products do you have?", "show me your categories"
- BUDGET_PLANNER : User mentions a budget amount + shopping/planning goal.
e.g. "I have $50", "$2000 for the month", "party for 100 people",
"what can I buy with $30", "monthly groceries for family of 4"
- RECIPE_REQUEST : User wants a recipe, meal idea, how to cook something,
- FEEDBACK : User wants to give feedback, suggest a product, report a missing item,
report a wrong price, or make a general suggestion about the service.
meal prep plan, or sends a meal image.
e.g. "recipe for sadza", "how do I cook bream", "weekly meal plan"
Rules:
- Extract items: translate ALL items to English (e.g. 'hupfu' → 'maize meal').
- If only a concept is given (e.g. "plan a braai"), set is_event_planning=true, items=[].
- Detect user language accurately (Shona, Ndebele, English).
- budget_amount: extract numeric USD amount if mentioned (e.g. 50 from "$50").
- headcount: extract number of people if mentioned (e.g. 100 from "party for 100").
- meal_name: extract dish name if recipe is requested.
- store_preference: store name if explicitly mentioned.
- utility_amount: numeric value for ZESA queries.
JSON Schema:
{
"actionable": boolean,
"intent": "string",
"items": ["string"],
"utility_amount": number,
"budget_amount": number | null,
"headcount": number | null,
"meal_name": "string | null",
"store_preference": "string | null",
"is_event_planning": boolean,
"language": "string",
"catalogue_scope": "string | null"
}
Transcript: """ + transcript
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT,
config=genai_types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, {
"actionable": False, "intent": "CASUAL_CHAT",
"language": "English", "items": [],
})
except Exception as e:
logger.error(f"Intent detect error: {e}")
return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []}
def gemini_explode_concept(transcript: str) -> List[str]:
"""Converts an event/meal concept into a concrete grocery list."""
if not _gemini_client:
return []
PROMPT = f"""
User wants to plan: "{transcript}"
Generate 10-15 essential Zimbabwean grocery items for this.
Use English terms for database lookup (e.g. 'Maize Meal', 'Cooking Oil', 'Beef').
Return ONLY a JSON list of strings.
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL, contents=PROMPT,
config=genai_types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, [])
except Exception as e:
logger.error(f"Concept explode error: {e}")
return []
def gemini_analyze_image(image_bytes: bytes, caption: str = "") -> Dict[str, Any]:
"""Analyse a WhatsApp image — grocery list, product, or meal dish."""
if not _gemini_client:
return {"type": "IRRELEVANT", "items": [], "description": ""}
PROMPT = f"""
Analyze this image. Context caption: "{caption}"
Classify:
1. SHOPPING_LIST → Extract each item (translate to English).
2. SINGLE_PRODUCT → Extract BRAND + NAME (e.g. "Pepsi 500ml").
3. MEAL_DISH → Identify dish name + core ingredients.
4. IRRELEVANT → Not shopping related.
Return STRICT JSON:
{{
"type": "SHOPPING_LIST" | "SINGLE_PRODUCT" | "MEAL_DISH" | "IRRELEVANT",
"items": ["item1", "item2"],
"description": "short description"
}}
"""
try:
image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[PROMPT, image_part],
config=genai_types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": [], "description": ""})
except Exception as e:
logger.error(f"Vision error: {e}")
return {"type": "IRRELEVANT", "items": [], "description": ""}
def sanitise_response(text: str) -> str:
"""
Post-process any Gemini response before sending to WhatsApp.
- Strips all raw URLs except pricelyst.co.zw (no product image links,
no api.pricelyst.co.zw/images/... paths, no external URLs)
- Removes lines that are just a bare URL (image link lines Gemini adds)
- Normalises excessive blank lines to max 2 in a row
"""
import re
lines = text.split("\n")
clean = []
for line in lines:
stripped = line.strip()
# Drop lines that are just a URL (image references, API links)
if re.match(r'^https?://\S+$', stripped):
continue
# Drop inline image URL patterns from Gemini
# e.g. "Here is an image: https://api.pricelyst.co.zw/images/..."
line = re.sub(
r'https?://(?!(?:www\.)?pricelyst\.co\.zw)\S+',
'',
line
)
# Drop lines that became empty after URL removal
line_stripped = line.strip()
if not line_stripped:
clean.append("")
else:
clean.append(line)
# Collapse runs of more than 2 blank lines
result = []
blank_count = 0
for line in clean:
if line.strip() == "":
blank_count += 1
if blank_count <= 2:
result.append(line)
else:
blank_count = 0
result.append(line)
return "\n".join(result).strip()
def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict,
chat_history: str = "", language: str = "English") -> str:
"""Generate Pricelyst AI's WhatsApp reply for standard intents."""
if not _gemini_client:
return "Hi! I'm Pricelyst AI from Pricelyst. Having a bit of trouble — please try again shortly."
context_str = ""
if chat_history:
context_str += f"RECENT CHAT:\n{chat_history}\n\n"
zesa_10 = calculate_zesa_units(10)
zesa_20 = calculate_zesa_units(20)
context_str += (
f"ZIMBABWE CONTEXT (ZESA 2026 tariffs, incl 6% levy):\n"
f" $10 = {zesa_10['est_units_kwh']} units | "
f"$20 = {zesa_20['est_units_kwh']} units\n"
f" Bands: 0-50u=$0.08/u, 51-100=$0.09/u, 101-200=$0.16/u, "
f"201-300=$0.23/u, 301-400=$0.25/u, 401+=$0.26/u\n"
f" Note: {ZIM_CONTEXT['zesa_note']}\n"
)
if analyst_data:
context_str += f"\nANALYST DATA:\n{json.dumps(analyst_data, default=str)}\n"
PROMPT = f"""
You are Pricelyst AI, Pricelyst Zimbabwe's friendly WhatsApp Shopping Advisor 🛒.
Mission: shortest path to value + complete price transparency for Zimbabwean shoppers.
INPUT: "{transcript}"
USER LANGUAGE: {language}
INTENT: {intent.get('intent', 'CASUAL_CHAT')}
CONTEXT:
{context_str}
FORMATTING (WhatsApp plain text — NO ## headers):
- *bold* for store names, prices, key figures
- Emojis naturally (✅ 🛒 💰 📍 ⚠️ 🔥 🍽️ 📸 ⚡)
- Blank lines between sections — breathe, don't cram
- Mobile-first: short lines, scannable
- NEVER include image URLs, product image links, or any URLs except pricelyst.co.zw
- NO product images — WhatsApp does not render inline images from URLs in chat
LOGIC:
1. BASKET (≤3 items): The formatter already structured it — just add a warm intro line.
2. BASKET (>3 items): Highlight the best store deal and 2-3 standout savings.
3. SINGLE ITEM: Best price first, then 2-3 alternatives. State exact savings.
4. ZESA: Show units calculation clearly with tier breakdown.
5. CASUAL/GREETING: You are a smart shopping advisor, not a generic chatbot.
Do NOT just say "Hi how can I help". Instead, acknowledge warmly AND show
one specific insight from the analyst data if available (a deal, a fact,
a tip). End with a concrete invitation to search.
6. DEALS_EXPLORE: List 5-8 deals with price and store. Make it feel like
a real market bulletin — exciting, specific, local.
7. EVENT_PLANNING: Acknowledge warmly, then present the basket clearly.
8. CATALOGUE_REQUEST: Confirm PDF is being prepared.
9. DISCOVER: List available categories with examples of what to search in each.
10. OFF_TOPIC: Gently steer back to shopping — you are a shopping advisor,
not a general assistant.
Always end with a specific, useful follow-up question or CTA — not a generic one.
"""
try:
resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT)
return sanitise_response(resp.text)
except Exception as e:
logger.error(f"Chat response error: {e}")
return "I checked the prices but I'm having trouble displaying them right now. Please try again!"
def gemini_translate(text: str, target_lang: str) -> str:
"""Translate Pricelyst AI's English response into the user's language if needed."""
if not _gemini_client or not target_lang or target_lang.lower() == "english":
return text
PROMPT = f"""
Translate this WhatsApp shopping assistant reply from English to {target_lang}.
Rules:
- Keep prices ($X.XX), store names, product names UNCHANGED.
- Keep WhatsApp formatting (*bold*, emojis) UNCHANGED.
- Natural, conversational tone.
Text: "{text}"
"""
try:
resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT)
return resp.text.strip()
except Exception as e:
logger.error(f"Translation error: {e}")
return text
# ─────────────────────────────────────────────
# 10. Catalogue PDF Generator
# ─────────────────────────────────────────────
def _build_pdf_header(story, styles, scope_label: str, title: str) -> None:
"""Shared Pricelyst branded header for all PDF types."""
from reportlab.lib import colors
from reportlab.lib.styles import ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import (Paragraph, Spacer, HRFlowable,
Table as RLTable, TableStyle as RLTS)
from reportlab.platypus import Image as RLImage
from reportlab.lib.enums import TA_CENTER
NAVY = colors.HexColor("#003087")
RED = colors.HexColor("#E63329")
GREY = colors.HexColor("#555555")
sub_style = ParagraphStyle("SubH", parent=styles["Normal"], fontSize=10,
textColor=GREY, alignment=TA_CENTER, spaceAfter=2)
wordmark = ParagraphStyle("WM", parent=styles["Title"], fontSize=24,
textColor=NAVY, fontName="Helvetica-Bold")
# Use module-level cached logo — downloaded via requests (works on HF),
# not urllib.request which hits the same egress block as Meta/Cloudflare
logo_path = _get_logo_path()
logo_el = None
if logo_path:
try:
logo_el = RLImage(logo_path, width=3*cm, height=3*cm, kind="proportional")
except Exception as e:
logger.warning(f"Logo RLImage init failed: {e}")
if logo_el:
hdr_data = [[logo_el,
[Paragraph("PRICELYST.", wordmark),
Paragraph("Zimbabwe\'s #1 Price Comparison", sub_style)]]]
hdr_tbl = RLTable(hdr_data, colWidths=[3.5*cm, None])
hdr_tbl.setStyle(RLTS([
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("LEFTPADDING", (0, 0), (-1, -1), 0),
("RIGHTPADDING", (0, 0), (-1, -1), 0),
("BOTTOMPADDING", (0, 0), (-1, -1), 0),
]))
story.append(hdr_tbl)
else:
# Text-only fallback — still branded with red dot accent
story.append(Paragraph("PRICELYST.", ParagraphStyle(
"WMFb", parent=styles["Title"], fontSize=28, textColor=NAVY,
fontName="Helvetica-Bold", alignment=TA_CENTER)))
story.append(Paragraph(
"Zimbabwe\'s #1 Price Comparison",
ParagraphStyle("TagFb", parent=styles["Normal"], fontSize=11,
textColor=RED, alignment=TA_CENTER)))
def _build_pdf_footer(story, styles) -> None:
"""Shared branded footer."""
from reportlab.lib import colors
from reportlab.lib.styles import ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import Spacer, HRFlowable, Paragraph
from reportlab.lib.enums import TA_CENTER
story.append(Spacer(1, 0.4*cm))
story.append(HRFlowable(width="100%", thickness=2,
color=colors.HexColor("#E63329")))
story.append(Spacer(1, 0.15*cm))
story.append(Paragraph(
"Prices subject to change without notice. Data sourced live from participating retailers.",
ParagraphStyle("Footer", parent=styles["Normal"], fontSize=7,
textColor=colors.HexColor("#888888"), alignment=TA_CENTER)
))
story.append(Spacer(1, 0.08*cm))
story.append(Paragraph(
"<b>For comprehensive basket comparisons and retailer Pre-Orders visit "
"<u>www.pricelyst.co.zw</u></b>",
ParagraphStyle("FooterCTA", parent=styles["Normal"], fontSize=8,
textColor=colors.HexColor("#003087"), alignment=TA_CENTER,
fontName="Helvetica-Bold")
))
def generate_catalogue_pdf(title: str, items: List[Dict],
scope_label: str = "Price Comparison") -> Optional[str]:
"""
Generate a price-comparison PDF.
Layout update:
- Brand column removed.
- Savings is its own column.
- Best offer is marked inside the Price column with a check mark.
"""
try:
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle)
filename = f"pricelyst_catalogue_{uuid.uuid4().hex[:8]}.pdf"
filepath = os.path.join(CATALOGUE_DIR, filename)
doc = SimpleDocTemplate(filepath, pagesize=A4,
rightMargin=1.4*cm, leftMargin=1.4*cm,
topMargin=1.4*cm, bottomMargin=1.4*cm)
styles = getSampleStyleSheet()
NAVY = colors.HexColor("#003087")
LIGHT = colors.HexColor("#EEF2FF")
head_style = ParagraphStyle("ColH", parent=styles["Normal"], fontSize=9,
textColor=colors.white, fontName="Helvetica-Bold")
cell_style = ParagraphStyle("Cell", parent=styles["Normal"], fontSize=8,
leading=10, textColor=colors.HexColor("#222222"))
best_style = ParagraphStyle("Best", parent=styles["Normal"], fontSize=8,
leading=10, textColor=NAVY, fontName="Helvetica-Bold")
money_style = ParagraphStyle("Money", parent=styles["Normal"], fontSize=8,
leading=10, textColor=colors.HexColor("#222222"))
story = []
_build_pdf_header(story, styles, scope_label, title)
story.append(Spacer(1, 0.2*cm))
# Product, Category, Store, Price, Savings
col_widths = [6.4*cm, 3.0*cm, 3.3*cm, 2.4*cm, 2.3*cm]
table_data = [[
Paragraph("Product", head_style),
Paragraph("Category", head_style),
Paragraph("Store", head_style),
Paragraph("Price", head_style),
Paragraph("Savings", head_style),
]]
for item in items:
offers = item.get("offers", [])
best_p = float(item.get("best_price", 0) or 0)
savings = float(item.get("potential_savings", 0) or 0)
sub_note = " ⚠ nearest" if item.get("is_substitute") else ""
savings_text = f"${savings:.2f}" if savings > 0.05 else "—"
if not offers:
table_data.append([
Paragraph(item.get("query", "Unknown") + sub_note, cell_style),
Paragraph("—", cell_style),
Paragraph("Not listed", cell_style),
Paragraph("N/A", cell_style),
Paragraph("—", cell_style),
])
continue
for idx_o, offer in enumerate(offers):
price = float(offer.get("price", 0) or 0)
is_best = abs(price - best_p) < 0.001
price_label = f"✓ ${price:.2f}" if is_best else f"${price:.2f}"
style = best_style if is_best else money_style
table_data.append([
Paragraph((item.get("product_name", "") + sub_note) if idx_o == 0 else "", cell_style),
Paragraph(item.get("category", "") if idx_o == 0 else "", cell_style),
Paragraph(str(offer.get("retailer", "")), style),
Paragraph(price_label, style),
Paragraph(savings_text if idx_o == 0 else "", cell_style),
])
tbl = Table(table_data, colWidths=col_widths, repeatRows=1)
tbl.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), NAVY),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, LIGHT]),
("GRID", (0, 0), (-1, -1), 0.35, colors.HexColor("#CCCCCC")),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 4),
("RIGHTPADDING", (0, 0), (-1, -1), 4),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
]))
story.append(tbl)
_build_pdf_footer(story, styles)
doc.build(story)
logger.info(f"Catalogue PDF generated: {filepath}")
return filepath
except Exception as e:
logger.error(f"PDF generation failed: {e}", exc_info=True)
return None
def generate_rich_pdf(title: str, scope_label: str, body_markdown: str,
items: Optional[List[Dict]] = None) -> Optional[str]:
"""
Generate a rich branded PDF for budget plans, meal plans, recipes.
body_markdown: the Gemini-generated text response, converted to clean paragraphs.
items: optional found_items list for an appended price table.
"""
try:
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer,
Table, TableStyle, HRFlowable)
from reportlab.lib.enums import TA_LEFT
filename = f"pricelyst_rich_{uuid.uuid4().hex[:8]}.pdf"
filepath = os.path.join(CATALOGUE_DIR, filename)
doc = SimpleDocTemplate(filepath, pagesize=A4,
rightMargin=1.5*cm, leftMargin=1.5*cm,
topMargin=1.5*cm, bottomMargin=1.5*cm)
styles = getSampleStyleSheet()
NAVY = colors.HexColor("#003087")
RED = colors.HexColor("#E63329")
LIGHT = colors.HexColor("#EEF2FF")
body_style = ParagraphStyle("Body", parent=styles["Normal"], fontSize=10,
leading=14, spaceAfter=4, textColor=colors.HexColor("#222222"))
h1_style = ParagraphStyle("H1", parent=styles["Heading2"], fontSize=13,
textColor=NAVY, fontName="Helvetica-Bold",
spaceAfter=6, spaceBefore=10)
bullet_style = ParagraphStyle("Bullet", parent=styles["Normal"], fontSize=10,
leading=14, leftIndent=12, spaceAfter=3,
textColor=colors.HexColor("#333333"))
bold_style = ParagraphStyle("Bold", parent=styles["Normal"], fontSize=10,
leading=14, fontName="Helvetica-Bold",
textColor=NAVY, spaceAfter=4)
story = []
_build_pdf_header(story, styles, scope_label, title)
# ── Parse body_markdown into PDF paragraphs ────────────────────────
# Strip WhatsApp formatting and convert to clean paragraphs
lines = body_markdown.replace("*", "").replace("_", "").split("\n")
for line in lines:
stripped = line.strip()
if not stripped:
story.append(Spacer(1, 0.15*cm))
continue
# Detect section headers (lines ending with colon or all caps short lines)
if (stripped.endswith(":") and len(stripped) < 60) or (stripped.isupper() and len(stripped) < 50):
story.append(Paragraph(stripped, h1_style))
elif stripped.startswith(("•", "-", "✅", "▪️", "🛒", "💰", "🔥", "⚡", "🍽️")):
clean = stripped.lstrip("•-✅▪️🛒💰🔥⚡🍽️ ")
story.append(Paragraph(f"• {clean}", bullet_style))
elif stripped.startswith(tuple("123456789")) and (". " in stripped or ". " in stripped):
story.append(Paragraph(stripped, bullet_style))
elif any(kw in stripped.lower() for kw in ["total:", "grand total", "cost per", "budget:", "subtotal"]):
story.append(Paragraph(stripped, bold_style))
else:
story.append(Paragraph(stripped, body_style))
# ── Optional price table ───────────────────────────────────────────
if items:
story.append(Spacer(1, 0.4*cm))
story.append(HRFlowable(width="100%", thickness=1,
color=colors.HexColor("#CCCCCC")))
story.append(Spacer(1, 0.2*cm))
story.append(Paragraph("Shopping List — Price Comparison",
ParagraphStyle("TblTitle", parent=styles["Heading3"],
fontSize=11, textColor=NAVY,
fontName="Helvetica-Bold")))
story.append(Spacer(1, 0.15*cm))
head_style = ParagraphStyle("ColH", parent=styles["Normal"], fontSize=9,
textColor=colors.white, fontName="Helvetica-Bold")
cell_style = ParagraphStyle("Cell", parent=styles["Normal"], fontSize=8,
textColor=colors.HexColor("#222222"))
best_style = ParagraphStyle("Best", parent=styles["Normal"], fontSize=8,
textColor=NAVY, fontName="Helvetica-Bold")
col_w = [5.5*cm, 3*cm, 3*cm, 2.5*cm, 3*cm]
tdata = [[
Paragraph("Ingredient", head_style),
Paragraph("Best Price", head_style),
Paragraph("Store", head_style),
Paragraph("Save", head_style),
Paragraph("Alternatives", head_style),
]]
for item in items:
offers = item.get("offers", [])
best_p = item.get("best_price", 0)
savings = item.get("potential_savings", 0)
alts = ", ".join(
f"{o['retailer']} ${o['price']:.2f}"
for o in offers[1:3]
) if len(offers) > 1 else "—"
best_store = offers[0]["retailer"] if offers else "—"
tdata.append([
Paragraph(item.get("product_name", item.get("query", "")), cell_style),
Paragraph(f"${best_p:.2f}", best_style),
Paragraph(best_store, cell_style),
Paragraph(f"${savings:.2f}" if savings > 0.05 else "—", cell_style),
Paragraph(alts, cell_style),
])
tbl = Table(tdata, colWidths=col_w, repeatRows=1)
tbl.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), NAVY),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, LIGHT]),
("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#CCCCCC")),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 4),
("RIGHTPADDING", (0, 0), (-1, -1), 4),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
]))
story.append(tbl)
_build_pdf_footer(story, styles)
doc.build(story)
logger.info(f"Rich PDF generated: {filepath}")
return filepath
except Exception as e:
logger.error(f"Rich PDF generation failed: {e}", exc_info=True)
return None
# ─────────────────────────────────────────────
# 11. Firebase Profile Helpers
# ─────────────────────────────────────────────
def get_or_create_profile(mobile: str) -> Dict[str, Any]:
if not db:
return {}
try:
ref = db.collection("pricelyst_profiles").document(mobile)
doc = ref.get()
if doc.exists:
return doc.to_dict()
profile = {"mobile": mobile, "created_at": datetime.now(timezone.utc).isoformat()}
ref.set(profile)
return profile
except Exception as e:
logger.error(f"Profile fetch error for {mobile}: {e}")
return {}
def get_chat_history(mobile: str, limit: int = 6) -> str:
if not db:
return ""
try:
docs = (
db.collection("pricelyst_profiles").document(mobile)
.collection("chat_logs")
.order_by("ts", direction=firestore.Query.DESCENDING)
.limit(limit)
.stream()
)
msgs = []
for d in docs:
data = d.to_dict()
msgs.append(f"User: {data.get('message', '')}\nApril: {data.get('response', '')}")
return "\n".join(reversed(msgs))
except Exception as e:
logger.error(f"Chat history error: {e}")
return ""
def save_chat_log(mobile: str, message: str, response: str, intent: Dict) -> None:
if not db:
return
try:
db.collection("pricelyst_profiles").document(mobile).collection("chat_logs").add({
"message": message,
"response": response,
"intent": intent,
"ts": datetime.now(timezone.utc).isoformat(),
})
except Exception as e:
logger.error(f"Chat log save error: {e}")
def save_shopping_plan(mobile: str, plan: Dict) -> Optional[str]:
if not db:
return None
try:
ref = (db.collection("pricelyst_profiles").document(mobile)
.collection("shopping_plans").document())
plan["id"] = ref.id
ref.set(plan)
return ref.id
except Exception as e:
logger.error(f"Plan save error: {e}")
return None
def save_feedback(mobile: str, feedback_text: str,
feedback_type: str = "general") -> bool:
"""
Save user feedback to a top-level Firestore collection for easy review.
feedback_type: "product_request" | "price_issue" | "suggestion" | "general"
"""
if not db:
return False
try:
db.collection("pricelyst_feedback").add({
"mobile": mobile,
"feedback": feedback_text,
"type": feedback_type,
"ts": datetime.now(timezone.utc).isoformat(),
"status": "new", # new | reviewed | actioned
})
# Also log against the user's profile for context
db.collection("pricelyst_profiles").document(mobile).collection("feedback").add({
"feedback": feedback_text,
"type": feedback_type,
"ts": datetime.now(timezone.utc).isoformat(),
})
logger.info(f"Feedback saved from {mobile}: [{feedback_type}] {feedback_text[:80]}")
return True
except Exception as e:
logger.error(f"Feedback save error: {e}")
return False
# ─────────────────────────────────────────────
# 12. Firebase Storage & Media Helpers
# ─────────────────────────────────────────────
def upload_to_firebase_storage(file_path: str, folder: str = "catalogues") -> Optional[str]:
if not FIREBASE_STORAGE_BUCKET:
return None
try:
bucket = fb_storage.bucket()
blob = bucket.blob(f"{folder}/{os.path.basename(file_path)}")
blob.upload_from_filename(file_path)
url = blob.generate_signed_url(expiration=timedelta(hours=1))
return url
except Exception as e:
logger.error(f"Firebase Storage upload failed: {e}")
return None
def upload_to_imgur(file_path: str) -> Optional[str]:
if not IMGUR_CLIENT_ID:
return None
try:
with open(file_path, "rb") as f:
resp = requests.post(IMGUR_URL, headers=IMGUR_HEADERS, files={"image": f})
resp.raise_for_status()
data = resp.json()
return data["data"]["link"] if data.get("success") else None
except Exception as e:
logger.error(f"Imgur upload failed: {e}")
return None
def deepgram_tts(text: str) -> Optional[str]:
if not DEEPGRAM_API_KEY:
return None
try:
resp = requests.post(
DEEPGRAM_TTS_URL,
headers={"Authorization": f"Token {DEEPGRAM_API_KEY}",
"Content-Type": "application/json"},
json={"text": text}, timeout=30,
)
resp.raise_for_status()
fp = os.path.join(os.getcwd(), f"tts_{uuid.uuid4().hex}.mp3")
with open(fp, "wb") as f:
f.write(resp.content)
return fp
except Exception as e:
logger.error(f"DeepGram TTS failed: {e}")
return None
# ─────────────────────────────────────────────
# 13. Deals & Discovery Helpers
# ─────────────────────────────────────────────
def get_todays_deals(limit: int = 8) -> List[Dict]:
df = get_market_index()
if df.empty:
return []
try:
offers = df[df["is_offer"] == True].copy()
if offers.empty:
return []
price_range = offers.groupby("product_name")["price"].agg(["min", "max"]).reset_index()
price_range["savings"] = price_range["max"] - price_range["min"]
top = (price_range[price_range["savings"] > 0.05]
.sort_values("savings", ascending=False).head(limit))
deals = []
for _, row in top.iterrows():
cheapest = offers[offers["product_name"] == row["product_name"]].sort_values("price").iloc[0]
deals.append({
"product_name": row["product_name"],
"cheapest_price": float(cheapest["price"]),
"retailer": cheapest["retailer"],
"savings": float(row["savings"]),
"category": cheapest.get("category", ""),
})
return deals
except Exception as e:
logger.error(f"Deals fetch error: {e}")
return []
def get_category_list() -> List[str]:
df = get_market_index()
if df.empty:
return []
try:
return sorted(df["category"].dropna().unique().tolist())
except Exception:
return []
def format_deals_message(deals: List[Dict]) -> str:
if not deals:
return "No deals data right now. Please try again shortly."
lines = ["🏷️ *Today\'s Best Deals* 🇿🇼\n"]
for i, d in enumerate(deals, 1):
lines.append(f"*{i}. {d['product_name']}*")
lines.append(f" 💰 ${d['cheapest_price']:.2f} @ {d['retailer']}")
lines.append(f" 🔥 Save up to ${d['savings']:.2f}")
lines.append("")
lines.append("_Type any product name to compare prices!_")
lines.append("")
lines.append("*For comprehensive basket comparisons and retailer Pre-Orders visit*")
lines.append("www.pricelyst.co.zw")
return "\n".join(lines)