Spaces:
Running
Running
| """ | |
| main.py — Pricelyst Shopping Advisor (Jessica Edition 2026 - Upgrade v3.1) | |
| ✅ Feature: "Vernacular Engine" (Shona/Ndebele/English Input -> Native Response). | |
| ✅ Feature: "Precision Search" (Prioritizes exact phrase matches over popularity). | |
| ✅ Feature: "Concept Exploder" (Event Planning -> Shopping List). | |
| ✅ UI/UX: "Nearest Match" phrasing for substitutions. | |
| ✅ Core: Deep Vector Search + Market Matrix + Store Preferences. | |
| ENV VARS: | |
| - GOOGLE_API_KEY=... | |
| - FIREBASE='{"type":"service_account", ...}' | |
| - PRICE_API_BASE=https://api.pricelyst.co.zw | |
| - GEMINI_MODEL=gemini-2.5-flash | |
| - PORT=5000 | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| import math | |
| import logging | |
| import base64 | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| import pandas as pd | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| # ––––– Logging ––––– | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(message)s" | |
| ) | |
| logger = logging.getLogger("pricelyst-advisor") | |
| # ––––– Gemini SDK ––––– | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| except Exception as e: | |
| genai = None | |
| logger.error("google-genai not installed. pip install google-genai. Error=%s", e) | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") | |
| _gemini_client = None | |
| if genai and GOOGLE_API_KEY: | |
| try: | |
| _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) | |
| except Exception as e: | |
| logger.error("Failed to init Gemini client: %s", e) | |
| # ––––– Firebase Admin ––––– | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| FIREBASE_ENV = os.environ.get("FIREBASE", "") | |
| def init_firestore_from_env() -> Optional[firestore.Client]: | |
| if firebase_admin._apps: | |
| return firestore.client() | |
| if not FIREBASE_ENV: | |
| logger.warning("FIREBASE env var missing. Persistence disabled.") | |
| return None | |
| try: | |
| sa_info = json.loads(FIREBASE_ENV) | |
| cred = credentials.Certificate(sa_info) | |
| firebase_admin.initialize_app(cred) | |
| logger.info("Firebase initialized.") | |
| return firestore.client() | |
| except Exception as e: | |
| logger.critical("Failed to initialize Firebase: %s", e) | |
| return None | |
| db = init_firestore_from_env() | |
| # ––––– External API ––––– | |
| PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/") | |
| HTTP_TIMEOUT = 30 | |
| # ––––– Static Data (Zim Context) ––––– | |
| ZIM_CONTEXT = { | |
| "fuel_petrol": 1.58, | |
| "fuel_diesel": 1.65, | |
| "gas_lpg": 2.00, | |
| "bread_avg": 1.10, | |
| "zesa_step_1": {"limit": 50, "rate": 0.04}, | |
| "zesa_step_2": {"limit": 150, "rate": 0.09}, | |
| "zesa_step_3": {"limit": 9999, "rate": 0.14}, | |
| "zesa_levy": 0.06 | |
| } | |
| # ––––– Cache ––––– | |
| PRODUCT_CACHE_TTL = 60 * 20 # 20 mins | |
| _data_cache: Dict[str, Any] = { | |
| "ts": 0, | |
| "df": pd.DataFrame(), | |
| "raw_count": 0 | |
| } | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ========================= | |
| # 1. ETL Layer (Deep Search Indexing) | |
| # ========================= | |
| def _norm(s: Any) -> str: | |
| if not s: return "" | |
| return str(s).strip().lower() | |
| def _coerce_price(v: Any) -> float: | |
| try: | |
| return float(v) if v is not None else 0.0 | |
| except: | |
| return 0.0 | |
| def _safe_json_loads(s: str, fallback: Any): | |
| try: | |
| if "```json" in s: | |
| s = s.split("```json")[1].split("```")[0] | |
| elif "```" in s: | |
| s = s.split("```")[0] | |
| return json.loads(s) | |
| except Exception as e: | |
| logger.error(f"JSON Parse Error: {e}") | |
| return fallback | |
| def fetch_and_flatten_data() -> pd.DataFrame: | |
| all_products = [] | |
| page = 1 | |
| logger.info("ETL: Starting fetch from /api/v1/product-listing") | |
| while True: | |
| try: | |
| url = f"{PRICE_API_BASE}/api/v1/product-listing" | |
| r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT) | |
| r.raise_for_status() | |
| payload = r.json() | |
| data = payload.get("data") or [] | |
| if not data: break | |
| all_products.extend(data) | |
| meta = payload | |
| if page >= (meta.get("totalPages") or 99): | |
| break | |
| page += 1 | |
| except Exception as e: | |
| logger.error(f"ETL Error on page {page}: {e}") | |
| break | |
| rows = [] | |
| for p in all_products: | |
| try: | |
| p_id = int(p.get("id") or 0) | |
| p_name = str(p.get("name") or "Unknown") | |
| brand_obj = p.get("brand") or {} | |
| brand_name = str(brand_obj.get("brand_name") or "") | |
| cats = p.get("categories") or [] | |
| cat_names = [str(c.get("name") or "") for c in cats] | |
| cat_str = " ".join(cat_names) | |
| primary_cat = cat_names[0] if cat_names else "General" | |
| # Deep Search Vector | |
| search_vector = _norm(f"{p_name} {brand_name} {cat_str}") | |
| views = int(p.get("view_count") or 0) | |
| image = str(p.get("thumbnail") or p.get("image") or "") | |
| prices = p.get("prices") or [] | |
| if not prices: | |
| rows.append({ | |
| "product_id": p_id, | |
| "product_name": p_name, | |
| "search_vector": search_vector, | |
| "brand": brand_name, | |
| "category": primary_cat, | |
| "retailer": "Listing", | |
| "price": 0.0, | |
| "views": views, | |
| "image": image, | |
| "is_offer": False | |
| }) | |
| continue | |
| for offer in prices: | |
| retailer = offer.get("retailer") or {} | |
| r_name = str(retailer.get("name") or "Unknown Store") | |
| price_val = _coerce_price(offer.get("price")) | |
| if price_val > 0: | |
| rows.append({ | |
| "product_id": p_id, | |
| "product_name": p_name, | |
| "search_vector": search_vector, | |
| "brand": brand_name, | |
| "category": primary_cat, | |
| "retailer": r_name, | |
| "price": price_val, | |
| "views": views, | |
| "image": image, | |
| "is_offer": True | |
| }) | |
| except: | |
| continue | |
| df = pd.DataFrame(rows) | |
| logger.info(f"ETL: Flattened into {len(df)} rows.") | |
| return df | |
| def get_market_index(force_refresh: bool = False) -> pd.DataFrame: | |
| global _data_cache | |
| if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL): | |
| logger.info("ETL: Refreshing Market Index...") | |
| df = fetch_and_flatten_data() | |
| _data_cache["df"] = df | |
| _data_cache["ts"] = time.time() | |
| _data_cache["raw_count"] = len(df) | |
| return _data_cache["df"] | |
| # ========================= | |
| # 2. Analyst Engine (Precision Search & Matrix) | |
| # ========================= | |
| def search_products_deep(df: pd.DataFrame, query: str, limit: int = 15) -> pd.DataFrame: | |
| """ | |
| Precision Search Algorithm. | |
| Prioritizes: | |
| 1. Exact sequential match in Name/Vector (Highest Score) | |
| 2. Token overlap (Medium Score) | |
| 3. Views/Popularity (Tie-breaker) | |
| """ | |
| if df.empty or not query: return df | |
| q_norm = _norm(query) | |
| q_tokens = set(q_norm.split()) | |
| def scoring_algo(row): | |
| score = 0 | |
| vector = row['search_vector'] | |
| # 1. Exact Name Match (Highest) | |
| if q_norm == _norm(row['product_name']): | |
| score += 1000 | |
| # 2. Sequential Vector Match (High) | |
| if q_norm in vector: | |
| score += 500 | |
| # 3. Brand Match | |
| if row['brand'].lower() in q_norm: | |
| score += 200 | |
| # 4. Token Overlap | |
| text_tokens = set(vector.split()) | |
| overlap = len(q_tokens.intersection(text_tokens)) | |
| score += (overlap * 50) | |
| return score | |
| df_scored = df.copy() | |
| df_scored['match_score'] = df_scored.apply(scoring_algo, axis=1) | |
| # Filter out zero matches | |
| matches = df_scored[df_scored['match_score'] > 0] | |
| if matches.empty: return matches | |
| # Sort: Match Score (Desc) -> Views (Desc) -> Price (Asc) | |
| matches = matches.sort_values(by=['match_score', 'views', 'price'], ascending=[False, False, True]) | |
| return matches.head(limit) | |
| def calculate_basket_optimization(item_names: List[str], preferred_retailer: str = None) -> Dict[str, Any]: | |
| """ | |
| Generates a FULL MARKET MATRIX with Precision Search. | |
| """ | |
| df = get_market_index() | |
| if df.empty: | |
| return {"actionable": False, "error": "No data"} | |
| found_items = [] | |
| missing_global = [] | |
| # 1. Resolve Items & Check Brand Fidelity | |
| for item in item_names: | |
| hits = search_products_deep(df[df['is_offer']==True], item, limit=10) | |
| if hits.empty: | |
| missing_global.append(item) | |
| continue | |
| best_match = hits.iloc[0] | |
| # --- Brand Fidelity Check --- | |
| q_norm = _norm(item) | |
| res_norm = _norm(best_match['product_name'] + " " + best_match['brand']) | |
| q_tokens = q_norm.split() | |
| is_substitute = False | |
| # If query has brand/spec but result score is low-ish (not exact name match), flag it. | |
| # Using a simple heuristic for now based on token overlap vs query length | |
| found_tokens = sum(1 for t in q_tokens if t in res_norm) | |
| if len(q_tokens) > 1 and found_tokens < len(q_tokens): | |
| is_substitute = True | |
| # Aggregate all offers | |
| product_offers = hits[hits['product_name'] == best_match['product_name']].sort_values('price') | |
| offers_list = [] | |
| for _, r in product_offers.iterrows(): | |
| offers_list.append({"retailer": r['retailer'], "price": float(r['price'])}) | |
| found_items.append({ | |
| "query": item, | |
| "product_name": str(best_match['product_name']), | |
| "is_substitute": is_substitute, | |
| "offers": offers_list, | |
| "best_price": offers_list[0]['price'] | |
| }) | |
| if not found_items: | |
| return {"actionable": True, "found_items": [], "global_missing": missing_global} | |
| # 2. MARKET MATRIX (Comparison across all stores) | |
| all_involved_retailers = set() | |
| for f in found_items: | |
| for o in f['offers']: | |
| all_involved_retailers.add(o['retailer']) | |
| store_comparison = [] | |
| for retailer in all_involved_retailers: | |
| total_price = 0.0 | |
| found_count = 0 | |
| missing_in_store = [] | |
| for item in found_items: | |
| price = next((o['price'] for o in item['offers'] if o['retailer'] == retailer), None) | |
| if price: | |
| total_price += price | |
| found_count += 1 | |
| else: | |
| missing_in_store.append(item['product_name']) | |
| store_comparison.append({ | |
| "retailer": retailer, | |
| "total_price": total_price, | |
| "found_count": found_count, | |
| "total_items": len(found_items), | |
| "missing_items": missing_in_store | |
| }) | |
| store_comparison.sort(key=lambda x: (-x['found_count'], x['total_price'])) | |
| return { | |
| "actionable": True, | |
| "is_basket": len(found_items) > 1, | |
| "found_items": found_items, | |
| "global_missing": missing_global, | |
| "market_matrix": store_comparison[:4], | |
| "best_store": store_comparison[0] if store_comparison else None, | |
| "preferred_retailer": preferred_retailer | |
| } | |
| def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]: | |
| remaining = amount_usd / 1.06 | |
| units = 0.0 | |
| t1 = ZIM_CONTEXT["zesa_step_1"] | |
| cost_t1 = t1["limit"] * t1["rate"] | |
| if remaining > cost_t1: | |
| units += t1["limit"] | |
| remaining -= cost_t1 | |
| t2 = ZIM_CONTEXT["zesa_step_2"] | |
| cost_t2 = t2["limit"] * t2["rate"] | |
| if remaining > cost_t2: | |
| units += t2["limit"] | |
| remaining -= cost_t2 | |
| units += remaining / ZIM_CONTEXT["zesa_step_3"]["rate"] | |
| else: | |
| units += remaining / t2["rate"] | |
| else: | |
| units += remaining / t1["rate"] | |
| return { | |
| "amount_usd": float(amount_usd), | |
| "est_units_kwh": float(round(units, 1)) | |
| } | |
| # ========================= | |
| # 3. Gemini Helpers (Vernacular & Intelligence) | |
| # ========================= | |
| def gemini_detect_intent(transcript: str) -> Dict[str, Any]: | |
| if not _gemini_client: return {"actionable": False} | |
| PROMPT = """ | |
| Analyze transcript. Return STRICT JSON. | |
| Classify intent: | |
| - CASUAL_CHAT: Greetings, "hi". | |
| - SHOPPING_BASKET: Looking for prices, products, "cheapest X". | |
| - UTILITY_CALC: Electricity/ZESA questions. | |
| - STORE_DECISION: "Where should I buy?", "Which store is cheapest?". | |
| - EVENT_PLANNING: "Plan a braai", "Wedding list", "Dinner for 5" (Implicit lists). | |
| Extract: | |
| - items: list of specific products found. **TRANSLATE ALL ITEMS TO ENGLISH** (e.g. 'Hupfu' -> 'Maize Meal'). | |
| - utility_amount: number | |
| - store_preference: if a specific store is named (e.g. "at OK Mart"). | |
| - is_event_planning: boolean (true if user asks to plan an event but lists no items). | |
| - language: Detected user language (e.g., "Shona", "Ndebele", "English"). | |
| JSON Schema: | |
| { | |
| "actionable": boolean, | |
| "intent": "string", | |
| "items": ["string"], | |
| "utility_amount": number, | |
| "store_preference": "string", | |
| "is_event_planning": boolean, | |
| "language": "string" | |
| } | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT + "\nTranscript: " + transcript, | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT", "language": "English"}) | |
| except Exception as e: | |
| logger.error(f"Intent Detect Error: {e}") | |
| return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English"} | |
| def gemini_explode_concept(transcript: str) -> List[str]: | |
| """ | |
| Converts a concept ("Braai for 10") into a concrete list ("Wors", "Charcoal"). | |
| """ | |
| if not _gemini_client: return [] | |
| PROMPT = f""" | |
| User wants to plan an event: "{transcript}". | |
| Generate a STRICT list of 10-15 essential Zimbabwean shopping items for this. | |
| Use English terms for database lookup (e.g. 'Maize Meal', 'Cooking Oil'). | |
| Return ONLY a JSON list of strings. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT, | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, []) | |
| except Exception as e: | |
| logger.error(f"Explode Concept Error: {e}") | |
| return [] | |
| def gemini_analyze_image(image_b64: str, caption: str = "") -> Dict[str, Any]: | |
| if not _gemini_client: return {"error": "AI Offline"} | |
| PROMPT = f""" | |
| Analyze this image. Context: {caption} | |
| 1. SHOPPING LIST? -> Extract items. | |
| 2. SINGLE PRODUCT? -> Extract BRAND + NAME (e.g. "Pepsi 500ml"). | |
| 3. MEAL/DISH? -> Identify dish + ingredients. | |
| 4. IRRELEVANT? -> Return type "IRRELEVANT". | |
| Return STRICT JSON: | |
| {{ | |
| "type": "LIST" | "PRODUCT" | "MEAL" | "IRRELEVANT", | |
| "items": ["item1"], | |
| "description": "Short description" | |
| }} | |
| """ | |
| try: | |
| image_bytes = base64.b64decode(image_b64) | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=[ | |
| PROMPT, | |
| types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") | |
| ], | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| result = _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": []}) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Vision Error: {e}") | |
| return {"type": "IRRELEVANT", "items": []} | |
| def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, chat_history: str = "") -> str: | |
| if not _gemini_client: return "I'm having trouble connecting to my brain right now." | |
| context_str = f"RECENT CHAT HISTORY (Last 6 messages):\n{chat_history}\n" if chat_history else "" | |
| context_str += f"ZIMBABWE CONTEXT: Fuel={ZIM_CONTEXT['fuel_petrol']}, ZESA Rate={ZIM_CONTEXT['zesa_step_1']['rate']}\n" | |
| if analyst_data: | |
| context_str += f"ANALYST DATA: {json.dumps(analyst_data, default=str)}\n" | |
| language = intent.get("language", "English") | |
| PROMPT = f""" | |
| You are Jessica, Pricelyst's Shopping Advisor (Zimbabwe). | |
| Role: Intelligent Shopping Companion. | |
| Goal: Shortest path to value. Complete Transparency. | |
| INPUT: "{transcript}" | |
| USER LANGUAGE: {language} | |
| INTENT: {intent.get('intent')} | |
| CONTEXT: | |
| {context_str} | |
| LOGIC RULES: | |
| 1. **LANGUAGE**: Reply in **{language}**. If Shona, use Shona. If English, use English. | |
| 2. **BASKET COMPARISON**: | |
| - If `market_matrix` has multiple stores, compare totals. | |
| - "Spar is **$6.95**, OK Mart is **$4.00** (but missing Oil)." | |
| 3. **BRAND SUBSTITUTES (Phrasing)**: | |
| - If `is_substitute` is TRUE for an item, say: | |
| "I couldn't find **[Query]**, but the **nearest match is** **[Found]** ($Price)." | |
| 4. **SINGLE ITEMS**: | |
| - Best price first, then others. | |
| 5. **CASUAL**: | |
| - Reset if user says "Hi". | |
| TONE: Helpful, direct, Zimbabwean. Use Markdown. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT | |
| ) | |
| return resp.text | |
| except Exception as e: | |
| logger.error(f"Chat Gen Error: {e}") | |
| return "I checked the prices, but I'm having trouble displaying them right now." | |
| def gemini_generate_4step_plan(transcript: str, analyst_result: Dict) -> str: | |
| if not _gemini_client: return "# Error\nAI Offline." | |
| PROMPT = f""" | |
| Generate a formatted Markdown Shopping Plan. | |
| USER REQUEST: "{transcript}" | |
| DATA: {json.dumps(analyst_result, indent=2, default=str)} | |
| CRITICAL INSTRUCTION: | |
| For items in 'global_missing', you MUST provide a Realistic USD Estimate (e.g. Chicken ~$6.00). | |
| Do not leave them as "Unknown". | |
| SECTIONS: | |
| 1. **In Our Catalogue ✅** | |
| (Markdown Table: | Item | Retailer | Price (USD) |) | |
| 2. **Not in Catalogue (Estimates) 😔** | |
| (Markdown Table: | Item | Estimated Price (USD) |) | |
| *Fill in estimated prices for missing items based on Zimbabwe market knowledge.* | |
| 3. **Totals 💰** | |
| - Confirmed Total (Catalogue) | |
| - Estimated Total (Missing Items) | |
| - **Grand Total Estimate** | |
| 4. **Ideas & Tips 💡** | |
| - 3 Creative ideas based on the specific event/meal (e.g. Braai tips, Cooking hacks). | |
| Tone: Warm, Professional, Zimbabwean. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) | |
| return resp.text | |
| except Exception as e: | |
| return "# Error\nCould not generate plan." | |
| # ========================= | |
| # 4. Endpoints | |
| # ========================= | |
| def health(): | |
| df = get_market_index() | |
| return jsonify({ | |
| "ok": True, | |
| "offers_indexed": len(df), | |
| "api_source": PRICE_API_BASE, | |
| "persona": "Jessica v3.1 (Babel Fish)" | |
| }) | |
| def chat(): | |
| body = request.get_json(silent=True) or {} | |
| msg = body.get("message", "") | |
| pid = body.get("profile_id") | |
| if not pid: return jsonify({"ok": False, "error": "Missing profile_id"}), 400 | |
| # History | |
| history_str = "" | |
| if db: | |
| try: | |
| docs = db.collection("pricelyst_profiles").document(pid).collection("chat_logs") \ | |
| .order_by("ts", direction=firestore.Query.DESCENDING).limit(6).stream() | |
| msgs = [f"User: {d.to_dict().get('message')}\nJessica: {d.to_dict().get('response')}" for d in docs] | |
| if msgs: history_str = "\n".join(reversed(msgs)) | |
| except: pass | |
| # Intent | |
| intent_data = gemini_detect_intent(msg) | |
| intent_type = intent_data.get("intent", "CASUAL_CHAT") | |
| items = intent_data.get("items", []) | |
| store_pref = intent_data.get("store_preference") | |
| analyst_data = {} | |
| if items or intent_type in ["SHOPPING_BASKET", "STORE_DECISION", "TRUST_CHECK"]: | |
| analyst_data = calculate_basket_optimization(items, preferred_retailer=store_pref) | |
| elif intent_type == "UTILITY_CALC": | |
| amount = intent_data.get("utility_amount", 20) | |
| analyst_data = calculate_zesa_units(amount) | |
| reply = gemini_chat_response(msg, intent_data, analyst_data, history_str) | |
| if db: | |
| db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({ | |
| "message": msg, | |
| "response": reply, | |
| "intent": intent_data, | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| return jsonify({"ok": True, "data": {"message": reply, "analyst_debug": analyst_data if items else None}}) | |
| def analyze_image(): | |
| body = request.get_json(silent=True) or {} | |
| image_b64 = body.get("image_data") | |
| caption = body.get("caption", "") | |
| pid = body.get("profile_id") | |
| if not image_b64 or not pid: return jsonify({"ok": False}), 400 | |
| vision_result = gemini_analyze_image(image_b64, caption) | |
| img_type = vision_result.get("type", "IRRELEVANT") | |
| items = vision_result.get("items", []) | |
| description = vision_result.get("description", "an image") | |
| # Fallback for empty products | |
| if (img_type in ["PRODUCT", "MEAL"]) and not items and description: | |
| items = [description] | |
| response_text = "" | |
| analyst_data = {} | |
| if img_type == "IRRELEVANT" and not items: | |
| prompt = f"User uploaded photo of {description}. Compliment it if appropriate, then explain you are a shopping bot." | |
| response_text = gemini_chat_response(prompt, {"intent": "CASUAL_CHAT"}, {}, "") | |
| elif items: | |
| analyst_data = calculate_basket_optimization(items) | |
| sim_msg = "" | |
| if img_type == "MEAL": sim_msg = f"I want to cook {description}. Cost of ingredients: {', '.join(items)}?" | |
| elif img_type == "LIST": sim_msg = f"Price of list: {', '.join(items)}?" | |
| else: sim_msg = f"Cheapest price for {', '.join(items)}?" | |
| response_text = gemini_chat_response(sim_msg, {"intent": "STORE_DECISION"}, analyst_data, "") | |
| else: | |
| response_text = "I couldn't identify the product. Could you type the name?" | |
| return jsonify({ | |
| "ok": True, | |
| "image_type": img_type, | |
| "items_identified": items, | |
| "message": response_text, | |
| "analyst_data": analyst_data | |
| }) | |
| def call_briefing(): | |
| """ | |
| Injects INTELLIGENT Market Data into the Voice Bot's context. | |
| Includes: Staples Index, ZESA/Fuel, Top 60 Catalogue. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| pid = body.get("profile_id") | |
| username = body.get("username", "Friend") | |
| if not pid: return jsonify({"ok": False}), 400 | |
| # 1. Memory Profile | |
| prof = {} | |
| if db: | |
| ref = db.collection("pricelyst_profiles").document(pid) | |
| doc = ref.get() | |
| if doc.exists: prof = doc.to_dict() | |
| else: ref.set({"created_at": datetime.now(timezone.utc).isoformat()}) | |
| if username != "Friend" and username != prof.get("username"): | |
| if db: db.collection("pricelyst_profiles").document(pid).set({"username": username}, merge=True) | |
| # 2. Market Intelligence Generation | |
| df = get_market_index() | |
| market_intel = "" | |
| # A. ZESA & Fuel | |
| zesa_10 = calculate_zesa_units(10.0) | |
| zesa_20 = calculate_zesa_units(20.0) | |
| context_section = f""" | |
| [CRITICAL CONTEXT - ZIMBABWE] | |
| FUEL: Petrol=${ZIM_CONTEXT['fuel_petrol']}, Diesel=${ZIM_CONTEXT['fuel_diesel']} | |
| BREAD: ~${ZIM_CONTEXT['bread_avg']} | |
| ZESA (Electricity): $10 = {zesa_10['est_units_kwh']}u, $20 = {zesa_20['est_units_kwh']}u | |
| """ | |
| # B. Staples Index | |
| staples = ["Cooking Oil", "Maize Meal", "Sugar", "Rice"] | |
| staple_summary = [] | |
| if not df.empty: | |
| for s in staples: | |
| hits = search_products_deep(df[df['is_offer']==True], s, limit=5) | |
| if not hits.empty: | |
| cheapest = hits.sort_values('price').iloc[0] | |
| staple_summary.append(f"- {s}: ${cheapest['price']} @ {cheapest['retailer']}") | |
| staples_section = "\n[STAPLES - LOWEST]\n" + "\n".join(staple_summary) | |
| # C. Top 60 Catalogue | |
| catalogue_lines = [] | |
| if not df.empty: | |
| top_items = df[df['is_offer']==True].sort_values('views', ascending=False).drop_duplicates('product_name').head(60) | |
| for _, r in top_items.iterrows(): | |
| p_name = r['product_name'] | |
| all_offers = df[(df['product_name'] == p_name) & df['is_offer']] | |
| prices_str = ", ".join([f"${o['price']} ({o['retailer']})" for _, o in all_offers.iterrows()]) | |
| catalogue_lines.append(f"- {p_name}: {prices_str}") | |
| catalogue_section = "\n[CATALOGUE - TOP 60]\n" + "\n".join(catalogue_lines) | |
| return jsonify({ | |
| "ok": True, | |
| "username": username, | |
| "memory_summary": prof.get("memory_summary", ""), | |
| "kpi_snapshot": context_section + staples_section + catalogue_section | |
| }) | |
| def log_call_usage(): | |
| """ | |
| Post-Call Orchestrator. | |
| v3.1: Handles Concept Explosion & Plan Generation. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| pid = body.get("profile_id") | |
| transcript = body.get("transcript", "") | |
| if not pid: return jsonify({"ok": False}), 400 | |
| # 1. Update Long-Term Memory | |
| if len(transcript) > 20 and db: | |
| try: | |
| curr_mem = db.collection("pricelyst_profiles").document(pid).get().to_dict().get("memory_summary", "") | |
| mem_prompt = f"Update user memory (budget, family size) based on: {transcript}\nOLD: {curr_mem}" | |
| mem_resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=mem_prompt) | |
| db.collection("pricelyst_profiles").document(pid).set({"memory_summary": mem_resp.text}, merge=True) | |
| except: pass | |
| # 2. Plan Generation Logic | |
| intent_data = gemini_detect_intent(transcript) | |
| plan_data = {} | |
| # Check if ACTIONABLE (Shopping or Event) | |
| if intent_data.get("actionable"): | |
| target_items = intent_data.get("items", []) | |
| # LOGIC: If Event Planning + No specific items -> EXPLODE CONCEPT | |
| if intent_data.get("is_event_planning") and not target_items: | |
| logger.info("💥 Exploding Concept for Event...") | |
| target_items = gemini_explode_concept(transcript) | |
| if target_items: | |
| analyst_result = calculate_basket_optimization(target_items) | |
| # v3.1: Generate Plan with Estimates & Creative Tips | |
| md_content = gemini_generate_4step_plan(transcript, analyst_result) | |
| plan_data = { | |
| "is_actionable": True, | |
| "title": f"Plan ({datetime.now().strftime('%d %b')})", | |
| "markdown_content": md_content, | |
| "items": target_items, | |
| "created_at": datetime.now(timezone.utc).isoformat() | |
| } | |
| if db: | |
| doc_ref = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document() | |
| plan_data["id"] = doc_ref.id | |
| doc_ref.set(plan_data) | |
| if db: | |
| db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({ | |
| "transcript": transcript, | |
| "intent": intent_data, | |
| "plan_generated": bool(plan_data), | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| return jsonify({ | |
| "ok": True, | |
| "shopping_plan": plan_data if plan_data.get("is_actionable") else None | |
| }) | |
| def list_plans(): | |
| pid = request.args.get("profile_id") | |
| if not pid or not db: return jsonify({"ok": False}), 400 | |
| try: | |
| docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \ | |
| .order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream() | |
| return jsonify({"ok": True, "plans": [{"id": d.id, **d.to_dict()} for d in docs]}) | |
| except: return jsonify({"ok": False}), 500 | |
| def delete_plan(plan_id): | |
| pid = request.args.get("profile_id") | |
| if not pid or not db: return jsonify({"ok": False}), 400 | |
| try: | |
| db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document(plan_id).delete() | |
| return jsonify({"ok": True}) | |
| except: return jsonify({"ok": False}), 500 | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| try: get_market_index(force_refresh=True) | |
| except: pass | |
| app.run(host="0.0.0.0", port=port) |