PriceLystAI-WA / utility.py
rairo's picture
Create utility.py
012a6dc verified
"""
utility.py — Pricelyst WhatsApp Bot
Core AI & Data layer:
- ETL from Pricelyst API -> in-memory Pandas market index
- Deep vector search & basket optimisation (Market Matrix)
- Gemini 2.5 Flash (new google-genai SDK) for intent, vision, chat
- Vernacular engine: Shona / Ndebele / English in -> native reply out
- Catalogue PDF generation (reportlab)
- Firebase persistence (profiles, chat history, shopping plans)
- ZESA electricity unit calculator
"""
import os
import re
import json
import time
import math
import uuid
import logging
import base64
import io
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional, Tuple
import requests
import pandas as pd
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────
# 1. Gemini (new google-genai SDK)
# ─────────────────────────────────────────────
try:
from google import genai
from google.genai import types as genai_types
except ImportError:
genai = None
genai_types = None
logger.error("google-genai not installed. Run: pip install google-genai")
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
_gemini_client = None
if genai and GOOGLE_API_KEY:
try:
_gemini_client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Gemini client ready (model=%s).", GEMINI_MODEL)
except Exception as e:
logger.error("Failed to init Gemini client: %s", e)
# ─────────────────────────────────────────────
# 2. Firebase
# ─────────────────────────────────────────────
import firebase_admin
from firebase_admin import credentials, firestore, storage as fb_storage
FIREBASE_ENV = os.environ.get("FIREBASE", "")
FIREBASE_STORAGE_BUCKET = os.environ.get("FIREBASE_STORAGE_BUCKET", "")
db: Optional[Any] = None
def init_firestore_from_env() -> Optional[Any]:
global db
try:
if firebase_admin._apps:
db = firestore.client()
return db
if not FIREBASE_ENV:
logger.warning("FIREBASE env var missing. Persistence disabled.")
return None
sa_info = json.loads(FIREBASE_ENV)
cred = credentials.Certificate(sa_info)
init_opts = {}
if FIREBASE_STORAGE_BUCKET:
init_opts["storageBucket"] = FIREBASE_STORAGE_BUCKET
firebase_admin.initialize_app(cred, init_opts)
db = firestore.client()
logger.info("Firebase initialized.")
return db
except Exception as e:
logger.critical("Failed to initialize Firebase: %s", e)
return None
db = init_firestore_from_env()
# ─────────────────────────────────────────────
# 3. Static Config
# ─────────────────────────────────────────────
PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/")
HTTP_TIMEOUT = 30
PRODUCT_CACHE_TTL = 60 * 20 # 20 minutes
ZIM_CONTEXT = {
"fuel_petrol": 1.58,
"fuel_diesel": 1.65,
"gas_lpg": 2.00,
"bread_avg": 1.10,
"zesa_step_1": {"limit": 50, "rate": 0.04},
"zesa_step_2": {"limit": 150, "rate": 0.09},
"zesa_step_3": {"limit": 9999, "rate": 0.14},
"zesa_levy": 0.06
}
IMGUR_CLIENT_ID = os.environ.get("IMGUR_CLIENT_ID", "")
IMGUR_URL = "https://api.imgur.com/3/image"
IMGUR_HEADERS = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {}
DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "")
DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en"
# Catalogue output dir
CATALOGUE_DIR = os.path.join(os.getcwd(), "catalogues")
os.makedirs(CATALOGUE_DIR, exist_ok=True)
# ─────────────────────────────────────────────
# 4. Market Index (ETL)
# ─────────────────────────────────────────────
_data_cache: Dict[str, Any] = {"ts": 0, "df": pd.DataFrame(), "raw_count": 0}
def _norm(s: Any) -> str:
if not s:
return ""
return str(s).strip().lower()
def _coerce_price(v: Any) -> float:
try:
return float(v) if v is not None else 0.0
except Exception:
return 0.0
def fetch_and_flatten_data() -> pd.DataFrame:
all_products = []
page = 1
logger.info("ETL: Starting fetch from /api/v1/product-listing")
while True:
try:
url = f"{PRICE_API_BASE}/api/v1/product-listing"
r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT)
r.raise_for_status()
payload = r.json()
data = payload.get("data") or []
if not data:
break
all_products.extend(data)
if page >= (payload.get("totalPages") or 99):
break
page += 1
except Exception as e:
logger.error(f"ETL Error on page {page}: {e}")
break
rows = []
for p in all_products:
try:
p_id = int(p.get("id") or 0)
p_name = str(p.get("name") or "Unknown")
brand_obj = p.get("brand") or {}
brand_name = str(brand_obj.get("brand_name") or "")
cats = p.get("categories") or []
cat_names = [str(c.get("name") or "") for c in cats]
cat_str = " ".join(cat_names)
primary_cat = cat_names[0] if cat_names else "General"
search_vector = _norm(f"{p_name} {brand_name} {cat_str}")
views = int(p.get("view_count") or 0)
image = str(p.get("thumbnail") or p.get("image") or "")
prices = p.get("prices") or []
if not prices:
rows.append({
"product_id": p_id, "product_name": p_name,
"search_vector": search_vector, "brand": brand_name,
"category": primary_cat, "retailer": "Listing",
"price": 0.0, "views": views, "image": image, "is_offer": False
})
continue
for offer in prices:
retailer = offer.get("retailer") or {}
r_name = str(retailer.get("name") or "Unknown Store")
price_val = _coerce_price(offer.get("price"))
if price_val > 0:
rows.append({
"product_id": p_id, "product_name": p_name,
"search_vector": search_vector, "brand": brand_name,
"category": primary_cat, "retailer": r_name,
"price": price_val, "views": views, "image": image, "is_offer": True
})
except Exception:
continue
df = pd.DataFrame(rows)
logger.info(f"ETL: Flattened into {len(df)} rows.")
return df
def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
global _data_cache
if (force_refresh or _data_cache["df"].empty
or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL)):
logger.info("ETL: Refreshing Market Index...")
df = fetch_and_flatten_data()
_data_cache.update({"df": df, "ts": time.time(), "raw_count": len(df)})
return _data_cache["df"]
# ─────────────────────────────────────────────
# 5. Precision Search & Basket Optimisation
# ─────────────────────────────────────────────
def search_products_deep(df: pd.DataFrame, query: str, limit: int = 15) -> pd.DataFrame:
"""
Precision Search Algorithm:
1. Exact Name Match (1000 pts)
2. Sequential vector match (500 pts)
3. Brand match (200 pts)
4. Token overlap (50 pts each)
Tie-break: views desc, price asc
"""
if df.empty or not query:
return df
q_norm = _norm(query)
q_tokens = set(q_norm.split())
def score(row):
s = 0
vector = row["search_vector"]
if q_norm == _norm(row["product_name"]): s += 1000
if q_norm in vector: s += 500
if row["brand"].lower() in q_norm: s += 200
overlap = len(q_tokens.intersection(set(vector.split())))
s += overlap * 50
return s
df_scored = df.copy()
df_scored["match_score"] = df_scored.apply(score, axis=1)
matches = df_scored[df_scored["match_score"] > 0]
if matches.empty:
return matches
return matches.sort_values(
["match_score", "views", "price"], ascending=[False, False, True]
).head(limit)
def calculate_basket_optimization(item_names: List[str],
preferred_retailer: Optional[str] = None) -> Dict[str, Any]:
"""Full market matrix with precision search, savings calculation & substitute flagging."""
df = get_market_index()
if df.empty:
return {"actionable": False, "error": "Market data unavailable. Please try again shortly."}
found_items = []
missing_global = []
for item in item_names:
hits = search_products_deep(df[df["is_offer"] == True], item, limit=10)
if hits.empty:
missing_global.append(item)
continue
best_match = hits.iloc[0]
q_norm = _norm(item)
res_norm = _norm(f"{best_match['product_name']} {best_match['brand']}")
q_tokens = q_norm.split()
is_sub = len(q_tokens) > 1 and sum(1 for t in q_tokens if t in res_norm) < len(q_tokens)
product_offers = (
hits[hits["product_name"] == best_match["product_name"]]
.sort_values("price")
)
offers_list = [{"retailer": r["retailer"], "price": float(r["price"])}
for _, r in product_offers.iterrows()]
best_price = offers_list[0]["price"]
max_price = offers_list[-1]["price"]
found_items.append({
"query": item,
"product_name": str(best_match["product_name"]),
"brand": str(best_match["brand"]),
"category": str(best_match["category"]),
"image": str(best_match["image"]),
"is_substitute": is_sub,
"offers": offers_list,
"best_price": best_price,
"potential_savings": max_price - best_price,
})
if not found_items:
return {"actionable": True, "found_items": [], "global_missing": missing_global}
# Market Matrix
all_retailers = set()
for f in found_items:
for o in f["offers"]:
all_retailers.add(o["retailer"])
store_comparison = []
for retailer in all_retailers:
total_price = 0.0
found_count = 0
missing_list = []
for item in found_items:
price = next((o["price"] for o in item["offers"] if o["retailer"] == retailer), None)
if price:
total_price += price
found_count += 1
else:
missing_list.append(item["product_name"])
store_comparison.append({
"retailer": retailer,
"total_price": total_price,
"found_count": found_count,
"total_items": len(found_items),
"missing_items": missing_list,
})
store_comparison.sort(key=lambda x: (-x["found_count"], x["total_price"]))
if len(store_comparison) > 1:
max_total = max(
s["total_price"] for s in store_comparison
if s["found_count"] == store_comparison[0]["found_count"]
)
for s in store_comparison:
s["basket_savings"] = (
max_total - s["total_price"]
if s["found_count"] == store_comparison[0]["found_count"] else 0.0
)
else:
for s in store_comparison:
s["basket_savings"] = 0.0
return {
"actionable": True,
"is_basket": len(found_items) > 1,
"found_items": found_items,
"global_missing": missing_global,
"market_matrix": store_comparison[:5],
"best_store": store_comparison[0] if store_comparison else None,
"preferred_retailer": preferred_retailer,
}
# ─────────────────────────────────────────────
# 6. ZESA Calculator
# ─────────────────────────────────────────────
def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]:
remaining = amount_usd / (1 + ZIM_CONTEXT["zesa_levy"])
units = 0.0
t1 = ZIM_CONTEXT["zesa_step_1"]
cost_t1 = t1["limit"] * t1["rate"]
if remaining > cost_t1:
units += t1["limit"]
remaining -= cost_t1
t2 = ZIM_CONTEXT["zesa_step_2"]
cost_t2 = t2["limit"] * t2["rate"]
if remaining > cost_t2:
units += t2["limit"]
remaining -= cost_t2
units += remaining / ZIM_CONTEXT["zesa_step_3"]["rate"]
else:
units += remaining / t2["rate"]
else:
units += remaining / t1["rate"]
return {"amount_usd": float(amount_usd), "est_units_kwh": float(round(units, 1))}
# ─────────────────────────────────────────────
# 7. Gemini Helpers (new SDK)
# ─────────────────────────────────────────────
def _safe_json_loads(s: str, fallback: Any) -> Any:
try:
cleaned = s
if "```json" in cleaned:
cleaned = cleaned.split("```json")[1].split("```")[0]
elif "```" in cleaned:
cleaned = cleaned.split("```")[1]
return json.loads(cleaned.strip())
except Exception as e:
logger.error(f"JSON parse error: {e} | raw: {s[:300]}")
return fallback
def gemini_detect_intent(transcript: str) -> Dict[str, Any]:
"""Classify intent + extract items/amounts from user message, supporting Shona/Ndebele/English."""
if not _gemini_client:
return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []}
PROMPT = """
Analyze the transcript below and return STRICT JSON.
Intents:
- CASUAL_CHAT : Greetings, "hi", off-topic
- SHOPPING_BASKET : Searching for prices / cheapest X
- UTILITY_CALC : Electricity / ZESA / fuel cost questions
- STORE_DECISION : "Which store is cheapest?", "Where should I shop?"
- EVENT_PLANNING : Implicit lists — "plan a braai", "wedding grocery list", "dinner for 5"
- CATALOGUE_REQUEST : User wants a PDF price list / catalogue / deals sheet
- DEALS_EXPLORE : "Today's deals", "promotions", "what's on special"
- DISCOVER : "What products do you have?", "show me your categories"
Rules:
- Extract items: translate ALL items to English (e.g. 'hupfu' → 'maize meal', 'mafuta' → 'cooking oil').
- If only a concept is given (e.g. "plan a braai"), set is_event_planning=true and items=[].
- Detect user language accurately (e.g. Shona, Ndebele, English).
- store_preference: name of store if explicitly mentioned.
- utility_amount: numeric value if mentioned.
JSON Schema:
{
"actionable": boolean,
"intent": "string",
"items": ["string"],
"utility_amount": number,
"store_preference": "string | null",
"is_event_planning": boolean,
"language": "string",
"catalogue_scope": "string | null"
}
Transcript: """ + transcript
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT,
config=genai_types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text,
{"actionable": False, "intent": "CASUAL_CHAT",
"language": "English", "items": []})
except Exception as e:
logger.error(f"Intent detect error: {e}")
return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English", "items": []}
def gemini_explode_concept(transcript: str) -> List[str]:
"""Converts an event/meal concept into a concrete grocery list."""
if not _gemini_client:
return []
PROMPT = f"""
User wants to plan: "{transcript}"
Generate 10-15 essential Zimbabwean grocery items for this.
Use English terms suitable for database lookup (e.g. 'Maize Meal', 'Cooking Oil', 'Beef').
Return ONLY a JSON list of strings.
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT,
config=genai_types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, [])
except Exception as e:
logger.error(f"Concept explode error: {e}")
return []
def gemini_analyze_image(image_bytes: bytes, caption: str = "") -> Dict[str, Any]:
"""Analyse a WhatsApp image — grocery list photo, product, or meal."""
if not _gemini_client:
return {"type": "IRRELEVANT", "items": [], "description": ""}
PROMPT = f"""
Analyze this image. Context caption: "{caption}"
Classify:
1. SHOPPING_LIST → Extract each item as written (translate to English).
2. SINGLE_PRODUCT → Extract BRAND + NAME (e.g. "Pepsi 500ml", "Zimgold Cooking Oil 2L").
3. MEAL_DISH → Identify dish name + core ingredients.
4. IRRELEVANT → Not shopping related.
Return STRICT JSON:
{{
"type": "SHOPPING_LIST" | "SINGLE_PRODUCT" | "MEAL_DISH" | "IRRELEVANT",
"items": ["item1", "item2"],
"description": "short description"
}}
"""
try:
image_part = genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[PROMPT, image_part],
config=genai_types.GenerateContentConfig(response_mime_type="application/json")
)
result = _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": [], "description": ""})
return result
except Exception as e:
logger.error(f"Vision error: {e}")
return {"type": "IRRELEVANT", "items": [], "description": ""}
def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict,
chat_history: str = "", language: str = "English") -> str:
"""Generate April's WhatsApp reply, formatted for plain text (no markdown headers)."""
if not _gemini_client:
return "Hi! I'm April from Pricelyst. I'm having a bit of trouble right now — please try again shortly."
context_str = ""
if chat_history:
context_str += f"RECENT CHAT:\n{chat_history}\n\n"
context_str += (
f"ZIMBABWE CONTEXT: Petrol=${ZIM_CONTEXT['fuel_petrol']}/L, "
f"Diesel=${ZIM_CONTEXT['fuel_diesel']}/L, "
f"Bread≈${ZIM_CONTEXT['bread_avg']}, "
f"ZESA: $10={calculate_zesa_units(10)['est_units_kwh']}u, "
f"$20={calculate_zesa_units(20)['est_units_kwh']}u\n"
)
if analyst_data:
context_str += f"\nANALYST DATA:\n{json.dumps(analyst_data, default=str)}\n"
PROMPT = f"""
You are April, Pricelyst Zimbabwe's friendly WhatsApp Shopping Advisor 🛒.
Your mission: shortest path to value + complete price transparency for Zimbabwean shoppers.
INPUT: "{transcript}"
USER LANGUAGE: {language}
INTENT: {intent.get('intent', 'CASUAL_CHAT')}
CONTEXT:
{context_str}
FORMATTING RULES (WhatsApp plain text — NO markdown headers like ##):
- Use *bold* for store names and prices.
- Use emojis naturally (✅ 🛒 💰 📍 ⚠️).
- Keep replies concise. No walls of text.
- If replying in Shona or Ndebele, ensure grammar is natural.
LOGIC:
1. BASKET COMPARISON: Present market matrix. State cheapest store total and basket_savings clearly.
Example: "✅ *OK Mart* has the best deal at *$4.00* total — saving you *$2.95* vs Spar!"
2. SUBSTITUTE (is_substitute=true): "I couldn't find *[Query]*, but the nearest match is *[Product]* at *$X*."
3. SINGLE ITEM: Show cheapest price + store, then alternatives. Note potential_savings if > $0.10.
4. ZESA: Calculate and explain units clearly.
5. CASUAL / GREETING: Be warm, introduce yourself briefly and invite a search query.
6. DEALS_EXPLORE: List 5-8 interesting current deals from analyst data.
7. EVENT_PLANNING: Acknowledge the plan, then present the shopping basket.
8. CATALOGUE_REQUEST: Inform the user their PDF catalogue is being prepared and will arrive shortly.
Always end with a helpful follow-up question or CTA if appropriate.
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT
)
return resp.text.strip()
except Exception as e:
logger.error(f"Chat response error: {e}")
return "I checked the prices, but I'm having trouble displaying them right now. Please try again!"
def gemini_translate(text: str, target_lang: str) -> str:
"""Translate April's English response into the user's language if needed."""
if not _gemini_client or not target_lang or target_lang.lower() == "english":
return text
PROMPT = f"""
Translate the following WhatsApp shopping assistant reply from English to {target_lang}.
Rules:
- Keep prices ($X.XX), store names, and product names UNCHANGED.
- Keep WhatsApp formatting (*bold*, emojis) UNCHANGED.
- Use natural, conversational tone.
Text:
"{text}"
"""
try:
resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT)
return resp.text.strip()
except Exception as e:
logger.error(f"Translation error: {e}")
return text
# ─────────────────────────────────────────────
# 8. Catalogue PDF Generator
# ─────────────────────────────────────────────
def generate_catalogue_pdf(title: str, items: List[Dict], scope_label: str = "Price Comparison") -> Optional[str]:
"""
Generate a professional Pricelyst price-comparison PDF using reportlab.
Returns file path on success, None on failure.
"""
try:
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer,
Table, TableStyle, HRFlowable)
from reportlab.lib.enums import TA_CENTER, TA_LEFT
filename = f"pricelyst_catalogue_{uuid.uuid4().hex[:8]}.pdf"
filepath = os.path.join(CATALOGUE_DIR, filename)
doc = SimpleDocTemplate(filepath, pagesize=A4,
rightMargin=1.5*cm, leftMargin=1.5*cm,
topMargin=1.5*cm, bottomMargin=1.5*cm)
styles = getSampleStyleSheet()
# Custom styles
title_style = ParagraphStyle("CatTitle", parent=styles["Title"],
fontSize=22, textColor=colors.HexColor("#1E7B34"),
spaceAfter=4, alignment=TA_CENTER)
sub_style = ParagraphStyle("CatSub", parent=styles["Normal"],
fontSize=10, textColor=colors.HexColor("#555555"),
alignment=TA_CENTER, spaceAfter=2)
head_style = ParagraphStyle("ColHead", parent=styles["Normal"],
fontSize=9, textColor=colors.white,
fontName="Helvetica-Bold")
cell_style = ParagraphStyle("Cell", parent=styles["Normal"],
fontSize=8, textColor=colors.HexColor("#222222"))
story = []
# Header
story.append(Paragraph("🛒 Pricelyst Zimbabwe", title_style))
story.append(Paragraph(scope_label, sub_style))
story.append(Paragraph(f"Generated: {datetime.now().strftime('%d %b %Y, %H:%M')}", sub_style))
story.append(Spacer(1, 0.3*cm))
story.append(HRFlowable(width="100%", thickness=2, color=colors.HexColor("#1E7B34")))
story.append(Spacer(1, 0.4*cm))
story.append(Paragraph(title, styles["Heading2"]))
story.append(Spacer(1, 0.3*cm))
# Table header
col_widths = [5*cm, 3*cm, 2.5*cm, 2*cm, 2.2*cm, 2.8*cm]
table_header = [
Paragraph("Product", head_style),
Paragraph("Brand", head_style),
Paragraph("Category", head_style),
Paragraph("Store", head_style),
Paragraph("Price (USD)", head_style),
Paragraph("Best Price", head_style),
]
table_data = [table_header]
for item in items:
offers = item.get("offers", [])
best_p = item.get("best_price", 0)
savings = item.get("potential_savings", 0)
sub_note = " ⚠ (nearest match)" if item.get("is_substitute") else ""
if offers:
# First row: product + first offer
first_offer = offers[0]
savings_str = f"${savings:.2f} savings" if savings > 0.05 else "Best"
table_data.append([
Paragraph(f"{item['product_name']}{sub_note}", cell_style),
Paragraph(item.get("brand", ""), cell_style),
Paragraph(item.get("category", ""), cell_style),
Paragraph(first_offer["retailer"], cell_style),
Paragraph(f"${first_offer['price']:.2f}", cell_style),
Paragraph(f"${best_p:.2f}\n{savings_str}", cell_style),
])
# Additional offers as sub-rows
for offer in offers[1:]:
table_data.append([
Paragraph("", cell_style),
Paragraph("", cell_style),
Paragraph("", cell_style),
Paragraph(offer["retailer"], cell_style),
Paragraph(f"${offer['price']:.2f}", cell_style),
Paragraph("", cell_style),
])
else:
table_data.append([
Paragraph(item.get("query", "Unknown"), cell_style),
Paragraph("", cell_style),
Paragraph("", cell_style),
Paragraph("Not listed", cell_style),
Paragraph("N/A", cell_style),
Paragraph("N/A", cell_style),
])
tbl = Table(table_data, colWidths=col_widths, repeatRows=1)
tbl.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1E7B34")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, 0), 9),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F0FFF4")]),
("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#CCCCCC")),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 4),
("RIGHTPADDING", (0, 0), (-1, -1), 4),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING",(0, 0), (-1, -1), 3),
]))
story.append(tbl)
story.append(Spacer(1, 0.5*cm))
# Footer
story.append(HRFlowable(width="100%", thickness=1, color=colors.HexColor("#CCCCCC")))
story.append(Spacer(1, 0.2*cm))
story.append(Paragraph(
"Prices subject to change. Data sourced from Pricelyst.co.zw — Zimbabwe's #1 price comparison platform.",
ParagraphStyle("Footer", parent=styles["Normal"], fontSize=7,
textColor=colors.HexColor("#888888"), alignment=TA_CENTER)
))
doc.build(story)
logger.info(f"Catalogue PDF generated: {filepath}")
return filepath
except Exception as e:
logger.error(f"PDF generation failed: {e}", exc_info=True)
return None
# ─────────────────────────────────────────────
# 9. Firebase Profile Helpers
# ─────────────────────────────────────────────
def get_or_create_profile(mobile: str) -> Dict[str, Any]:
if not db:
return {}
try:
ref = db.collection("pricelyst_profiles").document(mobile)
doc = ref.get()
if doc.exists:
return doc.to_dict()
profile = {"mobile": mobile, "created_at": datetime.now(timezone.utc).isoformat()}
ref.set(profile)
return profile
except Exception as e:
logger.error(f"Profile fetch error for {mobile}: {e}")
return {}
def get_chat_history(mobile: str, limit: int = 6) -> str:
if not db:
return ""
try:
docs = (
db.collection("pricelyst_profiles").document(mobile)
.collection("chat_logs")
.order_by("ts", direction=firestore.Query.DESCENDING)
.limit(limit)
.stream()
)
msgs = []
for d in docs:
data = d.to_dict()
msgs.append(f"User: {data.get('message', '')}\nApril: {data.get('response', '')}")
return "\n".join(reversed(msgs))
except Exception as e:
logger.error(f"Chat history error: {e}")
return ""
def save_chat_log(mobile: str, message: str, response: str, intent: Dict) -> None:
if not db:
return
try:
db.collection("pricelyst_profiles").document(mobile).collection("chat_logs").add({
"message": message,
"response": response,
"intent": intent,
"ts": datetime.now(timezone.utc).isoformat()
})
except Exception as e:
logger.error(f"Chat log save error: {e}")
def save_shopping_plan(mobile: str, plan: Dict) -> Optional[str]:
if not db:
return None
try:
ref = (db.collection("pricelyst_profiles").document(mobile)
.collection("shopping_plans").document())
plan["id"] = ref.id
ref.set(plan)
return ref.id
except Exception as e:
logger.error(f"Plan save error: {e}")
return None
# ─────────────────────────────────────────────
# 10. Firebase Storage Upload
# ─────────────────────────────────────────────
def upload_to_firebase_storage(file_path: str, folder: str = "catalogues") -> Optional[str]:
"""Upload file to Firebase Storage and return a signed 1-hour URL."""
if not FIREBASE_STORAGE_BUCKET:
return None
try:
bucket = fb_storage.bucket()
blob = bucket.blob(f"{folder}/{os.path.basename(file_path)}")
blob.upload_from_filename(file_path)
url = blob.generate_signed_url(expiration=timedelta(hours=1))
return url
except Exception as e:
logger.error(f"Firebase Storage upload failed: {e}")
return None
def upload_to_imgur(file_path: str) -> Optional[str]:
"""Upload image to Imgur and return public URL."""
if not IMGUR_CLIENT_ID:
return None
try:
with open(file_path, "rb") as f:
resp = requests.post(IMGUR_URL, headers=IMGUR_HEADERS, files={"image": f})
resp.raise_for_status()
data = resp.json()
return data["data"]["link"] if data.get("success") else None
except Exception as e:
logger.error(f"Imgur upload failed: {e}")
return None
# ─────────────────────────────────────────────
# 11. TTS (DeepGram)
# ─────────────────────────────────────────────
def deepgram_tts(text: str) -> Optional[str]:
"""Convert text to MP3 via DeepGram TTS. Returns local file path."""
if not DEEPGRAM_API_KEY:
return None
try:
resp = requests.post(
DEEPGRAM_TTS_URL,
headers={"Authorization": f"Token {DEEPGRAM_API_KEY}", "Content-Type": "application/json"},
json={"text": text},
timeout=30
)
resp.raise_for_status()
fp = os.path.join(os.getcwd(), f"tts_{uuid.uuid4().hex}.mp3")
with open(fp, "wb") as f:
f.write(resp.content)
return fp
except Exception as e:
logger.error(f"DeepGram TTS failed: {e}")
return None
# ─────────────────────────────────────────────
# 12. Deals & Discovery Helpers
# ─────────────────────────────────────────────
def get_todays_deals(limit: int = 8) -> List[Dict]:
"""Return items with highest potential savings — proxy for 'deals'."""
df = get_market_index()
if df.empty:
return []
try:
offers = df[df["is_offer"] == True].copy()
if offers.empty:
return []
# Compute savings per product
price_range = offers.groupby("product_name")["price"].agg(["min", "max"]).reset_index()
price_range["savings"] = price_range["max"] - price_range["min"]
top = price_range[price_range["savings"] > 0.05].sort_values("savings", ascending=False).head(limit)
deals = []
for _, row in top.iterrows():
cheapest = offers[offers["product_name"] == row["product_name"]].sort_values("price").iloc[0]
deals.append({
"product_name": row["product_name"],
"cheapest_price": float(cheapest["price"]),
"retailer": cheapest["retailer"],
"savings": float(row["savings"]),
"category": cheapest.get("category", ""),
})
return deals
except Exception as e:
logger.error(f"Deals fetch error: {e}")
return []
def get_category_list() -> List[str]:
"""Return unique product categories for discovery."""
df = get_market_index()
if df.empty:
return []
try:
return sorted(df["category"].dropna().unique().tolist())
except Exception:
return []
def format_deals_message(deals: List[Dict]) -> str:
"""Format deals as a WhatsApp-ready text block."""
if not deals:
return "No deals data available right now. Please try again shortly."
lines = ["🏷️ *Today's Best Deals on Pricelyst* 🇿🇼\n"]
for i, d in enumerate(deals, 1):
lines.append(
f"{i}. *{d['product_name']}*\n"
f" 💰 ${d['cheapest_price']:.2f} @ {d['retailer']}\n"
f" 🔥 Save up to ${d['savings']:.2f}\n"
)
lines.append("\n_Reply with any product name to compare prices across stores!_")
return "\n".join(lines)
def format_basket_for_whatsapp(analyst: Dict, language: str = "English") -> str:
"""Format basket optimization result as clean WhatsApp text (used as fallback)."""
if not analyst.get("actionable"):
return analyst.get("error", "Sorry, I couldn't fetch price data right now.")
lines = []
found = analyst.get("found_items", [])
missing = analyst.get("global_missing", [])
matrix = analyst.get("market_matrix", [])
if found:
lines.append("🛒 *Price Breakdown:*\n")
for item in found:
sub = " _(nearest match)_" if item.get("is_substitute") else ""
lines.append(f"• *{item['product_name']}*{sub}")
for o in item["offers"][:3]:
marker = "✅" if o["price"] == item["best_price"] else " "
lines.append(f" {marker} {o['retailer']}: *${o['price']:.2f}*")
if item.get("potential_savings", 0) > 0.10:
lines.append(f" 💡 Save up to ${item['potential_savings']:.2f}")
lines.append("")
if len(matrix) > 1:
lines.append("🏪 *Store Totals:*\n")
for s in matrix[:3]:
cover = f"{s['found_count']}/{s['total_items']} items"
lines.append(f"• *{s['retailer']}*: ${s['total_price']:.2f} ({cover})")
best = matrix[0]
savings = best.get("basket_savings", 0)
if savings > 0.10:
lines.append(f"\n✅ *Best: {best['retailer']}* — saves you *${savings:.2f}*!")
if missing:
lines.append(f"\n⚠️ Not found: {', '.join(missing)}")
lines.append("_These may be available in-store — try a broader search term._")
return "\n".join(lines)