Spaces:
Sleeping
Sleeping
| """ | |
| main.py — Pricelyst Shopping Advisor (Jessica Edition 2026 - Upgrade v2.5) | |
| ✅ Fixed: "Basket Regression" - AI now returns prices IMMEDIATELY. | |
| ✅ Fixed: "Bluffing" - AI explicitly states if item is found or missing. | |
| ✅ Optimization: Removed "Add to list" chatter. Shortest path to value. | |
| ✅ "Analyst Engine": Enhanced Basket Math, Category Context, ZESA Logic. | |
| ✅ "Visual Engine": Lists, Products, & Meal-to-Recipe recognition. | |
| ✅ Memory Logic: Short-Term Sliding Window (Last 6 messages). | |
| ENV VARS: | |
| - GOOGLE_API_KEY=... | |
| - FIREBASE='{"type":"service_account", ...}' | |
| - PRICE_API_BASE=https://api.pricelyst.co.zw | |
| - GEMINI_MODEL=gemini-2.5-flash | |
| - PORT=5000 | |
| """ | |
| import os | |
| import re | |
| import json | |
| import time | |
| import math | |
| import logging | |
| import base64 | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| import pandas as pd | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| # ––––– Logging ––––– | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(message)s" | |
| ) | |
| logger = logging.getLogger("pricelyst-advisor") | |
| # ––––– Gemini SDK ––––– | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| except Exception as e: | |
| genai = None | |
| logger.error("google-genai not installed. pip install google-genai. Error=%s", e) | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") | |
| _gemini_client = None | |
| if genai and GOOGLE_API_KEY: | |
| try: | |
| _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) | |
| except Exception as e: | |
| logger.error("Failed to init Gemini client: %s", e) | |
| # ––––– Firebase Admin ––––– | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| FIREBASE_ENV = os.environ.get("FIREBASE", "") | |
| def init_firestore_from_env() -> Optional[firestore.Client]: | |
| if firebase_admin._apps: | |
| return firestore.client() | |
| if not FIREBASE_ENV: | |
| logger.warning("FIREBASE env var missing. Persistence disabled.") | |
| return None | |
| try: | |
| sa_info = json.loads(FIREBASE_ENV) | |
| cred = credentials.Certificate(sa_info) | |
| firebase_admin.initialize_app(cred) | |
| logger.info("Firebase initialized.") | |
| return firestore.client() | |
| except Exception as e: | |
| logger.critical("Failed to initialize Firebase: %s", e) | |
| return None | |
| db = init_firestore_from_env() | |
| # ––––– External API ––––– | |
| PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/") | |
| HTTP_TIMEOUT = 30 | |
| # ––––– Static Data (Zim Context) ––––– | |
| ZIM_CONTEXT = { | |
| "fuel_petrol": 1.58, | |
| "fuel_diesel": 1.65, | |
| "gas_lpg": 2.00, | |
| "bread_avg": 1.00, | |
| "zesa_step_1": {"limit": 50, "rate": 0.04}, | |
| "zesa_step_2": {"limit": 150, "rate": 0.09}, | |
| "zesa_step_3": {"limit": 9999, "rate": 0.14}, | |
| "zesa_levy": 0.06 | |
| } | |
| # ––––– Cache ––––– | |
| PRODUCT_CACHE_TTL = 60 * 20 # 20 mins | |
| _data_cache: Dict[str, Any] = { | |
| "ts": 0, | |
| "df": pd.DataFrame(), | |
| "raw_count": 0 | |
| } | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ========================= | |
| # 1. ETL Layer (Ingestion) | |
| # ========================= | |
| def _norm(s: Any) -> str: | |
| if not s: return "" | |
| return str(s).strip().lower() | |
| def _coerce_price(v: Any) -> float: | |
| try: | |
| return float(v) if v is not None else 0.0 | |
| except: | |
| return 0.0 | |
| def _safe_json_loads(s: str, fallback: Any): | |
| try: | |
| if "```json" in s: | |
| s = s.split("```json")[1].split("```")[0] | |
| elif "```" in s: | |
| s = s.split("```")[0] | |
| return json.loads(s) | |
| except Exception as e: | |
| logger.error(f"JSON Parse Error: {e}") | |
| return fallback | |
| def fetch_and_flatten_data() -> pd.DataFrame: | |
| """Fetches from /api/v1/product-listing and flattens into an analytical DF.""" | |
| all_products = [] | |
| page = 1 | |
| logger.info("ETL: Starting fetch from /api/v1/product-listing") | |
| while True: | |
| try: | |
| url = f"{PRICE_API_BASE}/api/v1/product-listing" | |
| r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT) | |
| r.raise_for_status() | |
| payload = r.json() | |
| data = payload.get("data") or [] | |
| if not data: break | |
| all_products.extend(data) | |
| meta = payload | |
| if page >= (meta.get("totalPages") or 99): | |
| break | |
| page += 1 | |
| except Exception as e: | |
| logger.error(f"ETL Error on page {page}: {e}") | |
| break | |
| rows = [] | |
| for p in all_products: | |
| try: | |
| p_id = int(p.get("id") or 0) | |
| p_name = str(p.get("name") or "Unknown") | |
| clean_name = _norm(p_name) | |
| cat_obj = p.get("category") or {} | |
| cat_name = str(cat_obj.get("name") or "General") | |
| brand_obj = p.get("brand") or {} | |
| brand_name = str(brand_obj.get("brand_name") or "") | |
| views = int(p.get("view_count") or 0) | |
| image = str(p.get("thumbnail") or p.get("image") or "") | |
| prices = p.get("prices") or [] | |
| if not prices: | |
| rows.append({ | |
| "product_id": p_id, | |
| "product_name": p_name, | |
| "clean_name": clean_name, | |
| "brand": brand_name, | |
| "category": cat_name, | |
| "retailer": "Listing", | |
| "price": 0.0, | |
| "views": views, | |
| "image": image, | |
| "is_offer": False | |
| }) | |
| continue | |
| for offer in prices: | |
| retailer = offer.get("retailer") or {} | |
| r_name = str(retailer.get("name") or "Unknown Store") | |
| price_val = _coerce_price(offer.get("price")) | |
| if price_val > 0: | |
| rows.append({ | |
| "product_id": p_id, | |
| "product_name": p_name, | |
| "clean_name": clean_name, | |
| "brand": brand_name, | |
| "category": cat_name, | |
| "retailer": r_name, | |
| "price": price_val, | |
| "views": views, | |
| "image": image, | |
| "is_offer": True | |
| }) | |
| except: | |
| continue | |
| df = pd.DataFrame(rows) | |
| logger.info(f"ETL: Flattened into {len(df)} rows.") | |
| return df | |
| def get_market_index(force_refresh: bool = False) -> pd.DataFrame: | |
| global _data_cache | |
| if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL): | |
| logger.info("ETL: Refreshing Market Index...") | |
| df = fetch_and_flatten_data() | |
| _data_cache["df"] = df | |
| _data_cache["ts"] = time.time() | |
| _data_cache["raw_count"] = len(df) | |
| return _data_cache["df"] | |
| # ========================= | |
| # 2. Analyst Engine (Math Logic) | |
| # ========================= | |
| def search_products_fuzzy(df: pd.DataFrame, query: str, limit: int = 10) -> pd.DataFrame: | |
| if df.empty or not query: return df | |
| q_norm = _norm(query) | |
| # 1. Contains | |
| mask_name = df['clean_name'].str.contains(q_norm, regex=False) | |
| matches = df[mask_name].copy() | |
| # 2. Token overlap fallback | |
| if matches.empty: | |
| q_tokens = set(q_norm.split()) | |
| def token_score(text): | |
| if not isinstance(text, str): return 0 | |
| text_tokens = set(text.split()) | |
| if not text_tokens: return 0 | |
| intersection = q_tokens.intersection(text_tokens) | |
| return len(intersection) | |
| df_scored = df.copy() | |
| df_scored['score'] = df_scored['clean_name'].apply(token_score) | |
| matches = df_scored[df_scored['score'] > 0] | |
| if matches.empty: return matches | |
| # 3. Sort by Views + Price | |
| matches = matches.sort_values(by=['views', 'price'], ascending=[False, True]) | |
| return matches.head(limit) | |
| def get_category_stats(df: pd.DataFrame, category_name: str) -> Dict[str, Any]: | |
| if df.empty: return {} | |
| cat_df = df[df['category'].str.lower().str.contains(category_name.lower()) & df['is_offer']] | |
| if cat_df.empty: | |
| cat_df = df[df['clean_name'].str.contains(category_name.lower()) & df['is_offer']] | |
| if cat_df.empty: return {} | |
| return { | |
| "category": category_name, | |
| "min_price": float(cat_df['price'].min()), | |
| "max_price": float(cat_df['price'].max()), | |
| "avg_price": float(cat_df['price'].mean()), | |
| "sample_size": int(len(cat_df)) | |
| } | |
| def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]: | |
| df = get_market_index() | |
| if df.empty: | |
| return {"actionable": False, "error": "No data"} | |
| found_items = [] | |
| missing_global = [] | |
| for item in item_names: | |
| hits = search_products_fuzzy(df[df['is_offer']==True], item, limit=5) | |
| if hits.empty: | |
| missing_global.append(item) | |
| continue | |
| best_prod = hits.iloc[0] | |
| cat_stats = get_category_stats(df, str(best_prod['category'])) | |
| found_items.append({ | |
| "query": str(item), | |
| "product_id": int(best_prod['product_id']), | |
| "name": str(best_prod['product_name']), | |
| "category": str(best_prod['category']), | |
| "retailer": str(best_prod['retailer']), # Added explicitly for prompt access | |
| "price": float(best_prod['price']), # Added explicitly for prompt access | |
| "category_stats": cat_stats | |
| }) | |
| if not found_items: | |
| return { | |
| "actionable": True, | |
| "basket_items": [], | |
| "global_missing": missing_global, | |
| "best_store": None, | |
| "split_strategy": None | |
| } | |
| target_pids = [x['product_id'] for x in found_items] | |
| relevant_offers = df[df['product_id'].isin(target_pids) & df['is_offer']] | |
| retailer_stats = [] | |
| all_retailers = relevant_offers['retailer'].unique() | |
| for retailer in all_retailers: | |
| r_df = relevant_offers[relevant_offers['retailer'] == retailer] | |
| found_count = len(r_df) | |
| total_price = r_df['price'].sum() | |
| retailer_pids = r_df['product_id'].tolist() | |
| found_names = [x['name'] for x in found_items if x['product_id'] in retailer_pids] | |
| retailer_stats.append({ | |
| "retailer": str(retailer), | |
| "total_price": float(total_price), | |
| "item_count": int(found_count), | |
| "coverage_percent": float((found_count / len(found_items)) * 100), | |
| "found_items": found_names | |
| }) | |
| retailer_stats.sort(key=lambda x: (-x['coverage_percent'], x['total_price'])) | |
| best_single_store = retailer_stats[0] if retailer_stats else None | |
| split_basket = [] | |
| split_total = 0.0 | |
| for item in found_items: | |
| p_offers = relevant_offers[relevant_offers['product_id'] == item['product_id']] | |
| if not p_offers.empty: | |
| best_offer = p_offers.sort_values('price').iloc[0] | |
| split_total += best_offer['price'] | |
| split_basket.append({ | |
| "item": item['name'], | |
| "retailer": str(best_offer['retailer']), | |
| "price": float(best_offer['price']) | |
| }) | |
| split_strategy = { | |
| "total_price": float(split_total), | |
| "breakdown": split_basket, | |
| "store_count": len(set(x['retailer'] for x in split_basket)) | |
| } | |
| return { | |
| "actionable": True, | |
| "basket_items": [x['name'] for x in found_items], | |
| "found_items_details": found_items, | |
| "global_missing": missing_global, | |
| "best_store": best_single_store, | |
| "split_strategy": split_strategy, | |
| "all_stores": retailer_stats[:3] | |
| } | |
| def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]: | |
| remaining = amount_usd / 1.06 | |
| units = 0.0 | |
| breakdown = [] | |
| t1 = ZIM_CONTEXT["zesa_step_1"] | |
| cost_t1 = t1["limit"] * t1["rate"] | |
| if remaining > cost_t1: | |
| units += t1["limit"] | |
| remaining -= cost_t1 | |
| breakdown.append(f"First {t1['limit']}u @ ${t1['rate']}") | |
| t2 = ZIM_CONTEXT["zesa_step_2"] | |
| cost_t2 = t2["limit"] * t2["rate"] | |
| if remaining > cost_t2: | |
| units += t2["limit"] | |
| remaining -= cost_t2 | |
| breakdown.append(f"Next {t2['limit']}u @ ${t2['rate']}") | |
| t3 = ZIM_CONTEXT["zesa_step_3"] | |
| bought = remaining / t3["rate"] | |
| units += bought | |
| breakdown.append(f"Balance -> {bought:.1f}u @ ${t3['rate']}") | |
| else: | |
| bought = remaining / t2["rate"] | |
| units += bought | |
| breakdown.append(f"Balance -> {bought:.1f}u @ ${t2['rate']}") | |
| else: | |
| bought = remaining / t1["rate"] | |
| units += bought | |
| breakdown.append(f"All {bought:.1f}u @ ${t1['rate']}") | |
| return { | |
| "amount_usd": float(amount_usd), | |
| "est_units_kwh": float(round(units, 1)), | |
| "breakdown": breakdown, | |
| "note": "Includes approx 6% REA levy deduction." | |
| } | |
| # ========================= | |
| # 3. Gemini Helpers | |
| # ========================= | |
| def gemini_detect_intent(transcript: str) -> Dict[str, Any]: | |
| if not _gemini_client: return {"actionable": False} | |
| PROMPT = """ | |
| Analyze transcript. Return STRICT JSON. | |
| Classify intent: | |
| - CASUAL_CHAT: Greetings, small talk, "hi", "thanks". | |
| - SHOPPING_BASKET: Looking for prices, products, lists, or "cheapest X". | |
| - UTILITY_CALC: Electricity/ZESA questions. | |
| - STORE_DECISION: "Where should I buy?", "Which store is cheapest?". | |
| - TRUST_CHECK: "Is this expensive?", "Is this a good deal?". | |
| Extract: | |
| - items: list of products found in the text. | |
| - utility_amount: number | |
| JSON Schema: | |
| { | |
| "actionable": boolean, | |
| "intent": "string", | |
| "items": ["string"], | |
| "utility_amount": number | |
| } | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT + "\nTranscript: " + transcript, | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT"}) | |
| except Exception as e: | |
| logger.error(f"Intent Detect Error: {e}") | |
| return {"actionable": False, "intent": "CASUAL_CHAT"} | |
| def gemini_analyze_image(image_b64: str, caption: str = "") -> Dict[str, Any]: | |
| if not _gemini_client: return {"error": "AI Offline"} | |
| PROMPT = f""" | |
| Analyze this image. Context: {caption} | |
| 1. SHOPPING LIST? -> Extract items. | |
| 2. SINGLE PRODUCT? -> Extract the BRAND and PRODUCT NAME into 'items'. (e.g. "Pepsi 500ml") | |
| 3. MEAL/DISH? -> Identify the dish and ingredients. | |
| 4. IRRELEVANT (Pet, Person, Nature)? -> Return type "IRRELEVANT". | |
| IMPORTANT: If type is 'PRODUCT', the 'items' list MUST contain the product name. Do not leave it empty. | |
| Return STRICT JSON: | |
| {{ | |
| "type": "LIST" | "PRODUCT" | "MEAL" | "IRRELEVANT", | |
| "items": ["item1"], | |
| "description": "Short description of what is seen" | |
| }} | |
| """ | |
| try: | |
| image_bytes = base64.b64decode(image_b64) | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=[ | |
| PROMPT, | |
| types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") | |
| ], | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| result = _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": []}) | |
| logger.info(f"🔮 VISION RAW: {json.dumps(result)}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Vision Error: {e}") | |
| return {"type": "IRRELEVANT", "items": []} | |
| def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, chat_history: str = "") -> str: | |
| if not _gemini_client: return "I'm having trouble connecting to my brain right now." | |
| context_str = f"RECENT CHAT HISTORY (Last 6 messages):\n{chat_history}\n" if chat_history else "" | |
| context_str += f"ZIMBABWE CONTEXT: Fuel={ZIM_CONTEXT['fuel_petrol']}, ZESA Rate={ZIM_CONTEXT['zesa_step_1']['rate']}\n" | |
| if analyst_data: | |
| context_str += f"ANALYST DATA (Prices/Availability): {json.dumps(analyst_data, default=str)}\n" | |
| PROMPT = f""" | |
| You are Jessica, Pricelyst's Shopping Advisor (Zimbabwe). | |
| Role: Intelligent Shopping Companion. | |
| Goal: Shortest path to value. Give answers, not promises. | |
| INPUT: "{transcript}" | |
| INTENT: {intent.get('intent')} | |
| CONTEXT: | |
| {context_str} | |
| CRITICAL INSTRUCTIONS (Shortest Path Rule): | |
| 1. **CHECK ANALYST DATA FIRST**: | |
| - If `ANALYST DATA` contains `found_items_details` or `split_strategy` with prices: **REPORT THEM IMMEDIATELY**. | |
| - Say: "I found [Product] at [Retailer] for $[Price]." | |
| - Do NOT say "I will add this to your list." | |
| - Do NOT say "I will check for you." (You have already checked!) | |
| 2. **MISSING ITEMS**: | |
| - If `global_missing` has items: Say "I checked, but we don't have [Item] in our current catalogue." | |
| - Don't fake it. Be honest about catalogue gaps. | |
| 3. **CASUAL CHAT**: | |
| - Only if no products are mentioned. "Makadii! How can I help?" | |
| - Reset topic if user says "Hi" or changes subject. | |
| TONE: Helpful, direct, Zimbabwean. Use Markdown for prices (e.g. **$3.50**). | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=PROMPT | |
| ) | |
| return resp.text | |
| except Exception as e: | |
| logger.error(f"Chat Gen Error: {e}") | |
| return "I checked the prices, but I'm having trouble displaying them right now." | |
| def gemini_generate_4step_plan(transcript: str, analyst_result: Dict) -> str: | |
| if not _gemini_client: return "# Error\nAI Offline." | |
| PROMPT = f""" | |
| Generate a formatted Markdown Shopping Plan (Jessica Edition). | |
| DATA: {json.dumps(analyst_result, indent=2, default=str)} | |
| SECTIONS: | |
| 1. **In Our Catalogue ✅** (Table: Item | Store | Price) | |
| 2. **Not in Catalogue 😔** (Estimates) | |
| 3. **Recommendation 💡** | |
| - "Best Single Store" vs "Split & Save". | |
| 4. **Budget Tips** | |
| Make it look professional yet friendly. | |
| """ | |
| try: | |
| resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT) | |
| return resp.text | |
| except Exception as e: | |
| return "# Error\nCould not generate plan." | |
| # ========================= | |
| # 4. Endpoints | |
| # ========================= | |
| def health(): | |
| df = get_market_index() | |
| return jsonify({ | |
| "ok": True, | |
| "offers_indexed": len(df), | |
| "api_source": PRICE_API_BASE, | |
| "persona": "Jessica v2.5 (Immediate Price Check)" | |
| }) | |
| def chat(): | |
| """ | |
| Unified Text Chat Endpoint. | |
| Uses SHORT-TERM SLIDING WINDOW memory only. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| msg = body.get("message", "") | |
| pid = body.get("profile_id") | |
| if not pid: return jsonify({"ok": False, "error": "Missing profile_id"}), 400 | |
| # 1. Fetch Short-Term History (Sliding Window) | |
| history_str = "" | |
| if db: | |
| try: | |
| # Get last 6 messages | |
| docs = db.collection("pricelyst_profiles").document(pid).collection("chat_logs") \ | |
| .order_by("ts", direction=firestore.Query.DESCENDING).limit(6).stream() | |
| msgs = [] | |
| for d in docs: | |
| data = d.to_dict() | |
| msgs.append(f"User: {data.get('message')}\nJessica: {data.get('response')}") | |
| if msgs: | |
| history_str = "\n".join(reversed(msgs)) | |
| except Exception as e: | |
| logger.error(f"History Fetch Error: {e}") | |
| # 2. Intent Detection | |
| intent_data = gemini_detect_intent(msg) | |
| intent_type = intent_data.get("intent", "CASUAL_CHAT") | |
| items = intent_data.get("items", []) | |
| analyst_data = {} | |
| # 3. Data Processing (The Analyst) | |
| # Trigger Analyst if Items exist OR intent is specifically about shopping/decisions | |
| if items or intent_type in ["SHOPPING_BASKET", "STORE_DECISION", "TRUST_CHECK"]: | |
| analyst_data = calculate_basket_optimization(items) | |
| elif intent_type == "UTILITY_CALC": | |
| amount = intent_data.get("utility_amount", 20) | |
| analyst_data = calculate_zesa_units(amount) | |
| # 4. Response Generation (The Persona) | |
| reply = gemini_chat_response(msg, intent_data, analyst_data, history_str) | |
| # 5. Async Logging | |
| if db: | |
| db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({ | |
| "message": msg, | |
| "response": reply, | |
| "intent": intent_data, | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| return jsonify({ | |
| "ok": True, | |
| "data": { | |
| "message": reply, | |
| "analyst_debug": analyst_data if items else None | |
| } | |
| }) | |
| def analyze_image(): | |
| """ | |
| Handles Image -> List/Product/Meal -> Shopping Data | |
| AUTO-RESOLVES intent with Context-Aware Simulation. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| image_b64 = body.get("image_data") | |
| caption = body.get("caption", "") | |
| pid = body.get("profile_id") | |
| if not image_b64 or not pid: return jsonify({"ok": False}), 400 | |
| # 1. Vision Analysis | |
| vision_result = gemini_analyze_image(image_b64, caption) | |
| img_type = vision_result.get("type", "IRRELEVANT") | |
| items = vision_result.get("items", []) | |
| description = vision_result.get("description", "an image") | |
| # Fallback: If type is PRODUCT/MEAL but items is empty, try to use description as search item | |
| if (img_type in ["PRODUCT", "MEAL"]) and not items and description: | |
| items = [description] | |
| logger.info(f"🔮 Fallback: Used description '{description}' as item.") | |
| response_text = "" | |
| analyst_data = {} | |
| # 2. Logic Branching | |
| if img_type == "IRRELEVANT" and not items: | |
| # Graceful Rejection | |
| prompt = f"User uploaded a photo of: {description}. If it is a pet/flower/view, compliment it warmly! Then effectively explain you are a shopping bot and can't price check that." | |
| response_text = gemini_chat_response(prompt, {"intent": "CASUAL_CHAT"}, {}, "") | |
| elif items: | |
| # Run the Analyst Engine | |
| analyst_data = calculate_basket_optimization(items) | |
| # 3. DYNAMIC SIMULATED INTENT (Force immediate answer) | |
| if img_type == "MEAL": | |
| simulated_user_msg = f"I want to cook {description}. I need {', '.join(items)}. How much does it cost?" | |
| intent_sim = {"intent": "SHOPPING_BASKET"} | |
| elif img_type == "LIST": | |
| simulated_user_msg = f"Here is my list: {', '.join(items)}. What are the prices?" | |
| intent_sim = {"intent": "STORE_DECISION"} | |
| else: # PRODUCT | |
| simulated_user_msg = f"I see {description}. What is the price for {', '.join(items)}?" | |
| intent_sim = {"intent": "STORE_DECISION"} | |
| # Generate Response | |
| response_text = gemini_chat_response( | |
| simulated_user_msg, | |
| intent_sim, | |
| analyst_data, | |
| chat_history="" | |
| ) | |
| else: | |
| response_text = "I couldn't quite identify the product in that image. Could you type the name for me?" | |
| return jsonify({ | |
| "ok": True, | |
| "image_type": img_type, | |
| "items_identified": items, | |
| "message": response_text, | |
| "analyst_data": analyst_data | |
| }) | |
| def call_briefing(): | |
| """ | |
| Injects LONG-TERM Memory + Context for Voice Bot. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| pid = body.get("profile_id") | |
| username = body.get("username") | |
| if not pid: return jsonify({"ok": False}), 400 | |
| prof = {} | |
| if db: | |
| ref = db.collection("pricelyst_profiles").document(pid) | |
| doc = ref.get() | |
| if doc.exists: | |
| prof = doc.to_dict() | |
| else: | |
| ref.set({"created_at": datetime.now(timezone.utc).isoformat()}) | |
| if username and username != prof.get("username"): | |
| if db: db.collection("pricelyst_profiles").document(pid).set({"username": username}, merge=True) | |
| # Mini-Catalogue | |
| df = get_market_index() | |
| catalogue_str = "" | |
| if not df.empty: | |
| top = df[df['is_offer']].sort_values('views', ascending=False).drop_duplicates('product_name').head(60) | |
| lines = [f"{r['product_name']} (~${r['price']:.2f})" for _, r in top.iterrows()] | |
| catalogue_str = ", ".join(lines) | |
| kpi_snapshot = { | |
| "market_rates": ZIM_CONTEXT, | |
| "popular_products": catalogue_str | |
| } | |
| return jsonify({ | |
| "ok": True, | |
| "memory_summary": prof.get("memory_summary", ""), | |
| "kpi_snapshot": json.dumps(kpi_snapshot) | |
| }) | |
| def log_call_usage(): | |
| """ | |
| Post-Call Orchestrator. | |
| Generates Plans & Updates Long-Term Memory. | |
| """ | |
| body = request.get_json(silent=True) or {} | |
| pid = body.get("profile_id") | |
| transcript = body.get("transcript", "") | |
| if not pid: return jsonify({"ok": False}), 400 | |
| # 1. Update Long-Term Memory | |
| if len(transcript) > 20 and db: | |
| try: | |
| curr_mem = db.collection("pricelyst_profiles").document(pid).get().to_dict().get("memory_summary", "") | |
| mem_prompt = f"Update user memory (budget, family size, favorite stores) based on this transcript:\nOLD: {curr_mem}\nTRANSCRIPT: {transcript}" | |
| mem_resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=mem_prompt) | |
| db.collection("pricelyst_profiles").document(pid).set({"memory_summary": mem_resp.text}, merge=True) | |
| except Exception as e: | |
| logger.error(f"Memory Update Error: {e}") | |
| # 2. Plan Generation | |
| intent_data = gemini_detect_intent(transcript) | |
| plan_data = {} | |
| if intent_data.get("actionable") and intent_data.get("items"): | |
| analyst_result = calculate_basket_optimization(intent_data["items"]) | |
| if analyst_result.get("actionable"): | |
| md_content = gemini_generate_4step_plan(transcript, analyst_result) | |
| plan_data = { | |
| "is_actionable": True, | |
| "title": f"Shopping Plan ({datetime.now().strftime('%d %b')})", | |
| "markdown_content": md_content, | |
| "items": intent_data["items"], | |
| "created_at": datetime.now(timezone.utc).isoformat() | |
| } | |
| if db: | |
| doc_ref = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document() | |
| plan_data["id"] = doc_ref.id | |
| doc_ref.set(plan_data) | |
| if db: | |
| db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({ | |
| "transcript": transcript, | |
| "intent": intent_data, | |
| "plan_generated": bool(plan_data), | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| }) | |
| return jsonify({ | |
| "ok": True, | |
| "shopping_plan": plan_data if plan_data.get("is_actionable") else None | |
| }) | |
| def list_plans(): | |
| pid = request.args.get("profile_id") | |
| if not pid or not db: return jsonify({"ok": False}), 400 | |
| try: | |
| docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \ | |
| .order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream() | |
| plans = [{"id": d.id, **d.to_dict()} for d in docs] | |
| return jsonify({"ok": True, "plans": plans}) | |
| except: | |
| return jsonify({"ok": False}), 500 | |
| def delete_plan(plan_id): | |
| pid = request.args.get("profile_id") | |
| if not pid or not db: return jsonify({"ok": False}), 400 | |
| try: | |
| db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document(plan_id).delete() | |
| return jsonify({"ok": True}) | |
| except: | |
| return jsonify({"ok": False}), 500 | |
| # ========================= | |
| # Main | |
| # ========================= | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| try: | |
| get_market_index(force_refresh=True) | |
| except: | |
| pass | |
| app.run(host="0.0.0.0", port=port) |