PriceLystAI-API / main.py
rairo's picture
Update main.py
540c3fc verified
"""
main.py — Pricelyst Shopping Advisor (Jessica Edition 2026 - Upgrade v3.1)
✅ Feature: "Vernacular Engine" (Shona/Ndebele/English Input -> Native Response).
✅ Feature: "Precision Search" (Prioritizes exact phrase matches over popularity).
✅ Feature: "Concept Exploder" (Event Planning -> Shopping List).
✅ UI/UX: "Nearest Match" phrasing for substitutions.
✅ Core: Deep Vector Search + Market Matrix + Store Preferences.
ENV VARS:
- GOOGLE_API_KEY=...
- FIREBASE='{"type":"service_account", ...}'
- PRICE_API_BASE=https://api.pricelyst.co.zw
- GEMINI_MODEL=gemini-2.5-flash
- PORT=5000
"""
import os
import re
import json
import time
import math
import logging
import base64
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
import requests
import pandas as pd
from flask import Flask, request, jsonify
from flask_cors import CORS
# ––––– Logging –––––
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("pricelyst-advisor")
# ––––– Gemini SDK –––––
try:
from google import genai
from google.genai import types
except Exception as e:
genai = None
logger.error("google-genai not installed. pip install google-genai. Error=%s", e)
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
_gemini_client = None
if genai and GOOGLE_API_KEY:
try:
_gemini_client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Gemini client ready (model=%s).", GEMINI_MODEL)
except Exception as e:
logger.error("Failed to init Gemini client: %s", e)
# ––––– Firebase Admin –––––
import firebase_admin
from firebase_admin import credentials, firestore
FIREBASE_ENV = os.environ.get("FIREBASE", "")
def init_firestore_from_env() -> Optional[firestore.Client]:
if firebase_admin._apps:
return firestore.client()
if not FIREBASE_ENV:
logger.warning("FIREBASE env var missing. Persistence disabled.")
return None
try:
sa_info = json.loads(FIREBASE_ENV)
cred = credentials.Certificate(sa_info)
firebase_admin.initialize_app(cred)
logger.info("Firebase initialized.")
return firestore.client()
except Exception as e:
logger.critical("Failed to initialize Firebase: %s", e)
return None
db = init_firestore_from_env()
# ––––– External API –––––
PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/")
HTTP_TIMEOUT = 30
# ––––– Static Data (Zim Context) –––––
ZIM_CONTEXT = {
"fuel_petrol": 1.58,
"fuel_diesel": 1.65,
"gas_lpg": 2.00,
"bread_avg": 1.10,
"zesa_step_1": {"limit": 50, "rate": 0.04},
"zesa_step_2": {"limit": 150, "rate": 0.09},
"zesa_step_3": {"limit": 9999, "rate": 0.14},
"zesa_levy": 0.06
}
# ––––– Cache –––––
PRODUCT_CACHE_TTL = 60 * 20 # 20 mins
_data_cache: Dict[str, Any] = {
"ts": 0,
"df": pd.DataFrame(),
"raw_count": 0
}
app = Flask(__name__)
CORS(app)
# =========================
# 1. ETL Layer (Deep Search Indexing)
# =========================
def _norm(s: Any) -> str:
if not s: return ""
return str(s).strip().lower()
def _coerce_price(v: Any) -> float:
try:
return float(v) if v is not None else 0.0
except:
return 0.0
def _safe_json_loads(s: str, fallback: Any):
try:
if "```json" in s:
s = s.split("```json")[1].split("```")[0]
elif "```" in s:
s = s.split("```")[0]
return json.loads(s)
except Exception as e:
logger.error(f"JSON Parse Error: {e}")
return fallback
def fetch_and_flatten_data() -> pd.DataFrame:
all_products = []
page = 1
logger.info("ETL: Starting fetch from /api/v1/product-listing")
while True:
try:
url = f"{PRICE_API_BASE}/api/v1/product-listing"
r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT)
r.raise_for_status()
payload = r.json()
data = payload.get("data") or []
if not data: break
all_products.extend(data)
meta = payload
if page >= (meta.get("totalPages") or 99):
break
page += 1
except Exception as e:
logger.error(f"ETL Error on page {page}: {e}")
break
rows = []
for p in all_products:
try:
p_id = int(p.get("id") or 0)
p_name = str(p.get("name") or "Unknown")
brand_obj = p.get("brand") or {}
brand_name = str(brand_obj.get("brand_name") or "")
cats = p.get("categories") or []
cat_names = [str(c.get("name") or "") for c in cats]
cat_str = " ".join(cat_names)
primary_cat = cat_names[0] if cat_names else "General"
# Deep Search Vector
search_vector = _norm(f"{p_name} {brand_name} {cat_str}")
views = int(p.get("view_count") or 0)
image = str(p.get("thumbnail") or p.get("image") or "")
prices = p.get("prices") or []
if not prices:
rows.append({
"product_id": p_id,
"product_name": p_name,
"search_vector": search_vector,
"brand": brand_name,
"category": primary_cat,
"retailer": "Listing",
"price": 0.0,
"views": views,
"image": image,
"is_offer": False
})
continue
for offer in prices:
retailer = offer.get("retailer") or {}
r_name = str(retailer.get("name") or "Unknown Store")
price_val = _coerce_price(offer.get("price"))
if price_val > 0:
rows.append({
"product_id": p_id,
"product_name": p_name,
"search_vector": search_vector,
"brand": brand_name,
"category": primary_cat,
"retailer": r_name,
"price": price_val,
"views": views,
"image": image,
"is_offer": True
})
except:
continue
df = pd.DataFrame(rows)
logger.info(f"ETL: Flattened into {len(df)} rows.")
return df
def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
global _data_cache
if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL):
logger.info("ETL: Refreshing Market Index...")
df = fetch_and_flatten_data()
_data_cache["df"] = df
_data_cache["ts"] = time.time()
_data_cache["raw_count"] = len(df)
return _data_cache["df"]
# =========================
# 2. Analyst Engine (Precision Search & Matrix)
# =========================
def search_products_deep(df: pd.DataFrame, query: str, limit: int = 15) -> pd.DataFrame:
"""
Precision Search Algorithm.
Prioritizes:
1. Exact sequential match in Name/Vector (Highest Score)
2. Token overlap (Medium Score)
3. Views/Popularity (Tie-breaker)
"""
if df.empty or not query: return df
q_norm = _norm(query)
q_tokens = set(q_norm.split())
def scoring_algo(row):
score = 0
vector = row['search_vector']
# 1. Exact Name Match (Highest)
if q_norm == _norm(row['product_name']):
score += 1000
# 2. Sequential Vector Match (High)
if q_norm in vector:
score += 500
# 3. Brand Match
if row['brand'].lower() in q_norm:
score += 200
# 4. Token Overlap
text_tokens = set(vector.split())
overlap = len(q_tokens.intersection(text_tokens))
score += (overlap * 50)
return score
df_scored = df.copy()
df_scored['match_score'] = df_scored.apply(scoring_algo, axis=1)
# Filter out zero matches
matches = df_scored[df_scored['match_score'] > 0]
if matches.empty: return matches
# Sort: Match Score (Desc) -> Views (Desc) -> Price (Asc)
matches = matches.sort_values(by=['match_score', 'views', 'price'], ascending=[False, False, True])
return matches.head(limit)
def calculate_basket_optimization(item_names: List[str], preferred_retailer: str = None) -> Dict[str, Any]:
"""
Generates a FULL MARKET MATRIX with Precision Search.
"""
df = get_market_index()
if df.empty:
return {"actionable": False, "error": "No data"}
found_items = []
missing_global = []
# 1. Resolve Items & Check Brand Fidelity
for item in item_names:
hits = search_products_deep(df[df['is_offer']==True], item, limit=10)
if hits.empty:
missing_global.append(item)
continue
best_match = hits.iloc[0]
# --- Brand Fidelity Check ---
q_norm = _norm(item)
res_norm = _norm(best_match['product_name'] + " " + best_match['brand'])
q_tokens = q_norm.split()
is_substitute = False
# If query has brand/spec but result score is low-ish (not exact name match), flag it.
# Using a simple heuristic for now based on token overlap vs query length
found_tokens = sum(1 for t in q_tokens if t in res_norm)
if len(q_tokens) > 1 and found_tokens < len(q_tokens):
is_substitute = True
# Aggregate all offers
product_offers = hits[hits['product_name'] == best_match['product_name']].sort_values('price')
offers_list = []
for _, r in product_offers.iterrows():
offers_list.append({"retailer": r['retailer'], "price": float(r['price'])})
found_items.append({
"query": item,
"product_name": str(best_match['product_name']),
"is_substitute": is_substitute,
"offers": offers_list,
"best_price": offers_list[0]['price']
})
if not found_items:
return {"actionable": True, "found_items": [], "global_missing": missing_global}
# 2. MARKET MATRIX (Comparison across all stores)
all_involved_retailers = set()
for f in found_items:
for o in f['offers']:
all_involved_retailers.add(o['retailer'])
store_comparison = []
for retailer in all_involved_retailers:
total_price = 0.0
found_count = 0
missing_in_store = []
for item in found_items:
price = next((o['price'] for o in item['offers'] if o['retailer'] == retailer), None)
if price:
total_price += price
found_count += 1
else:
missing_in_store.append(item['product_name'])
store_comparison.append({
"retailer": retailer,
"total_price": total_price,
"found_count": found_count,
"total_items": len(found_items),
"missing_items": missing_in_store
})
store_comparison.sort(key=lambda x: (-x['found_count'], x['total_price']))
return {
"actionable": True,
"is_basket": len(found_items) > 1,
"found_items": found_items,
"global_missing": missing_global,
"market_matrix": store_comparison[:4],
"best_store": store_comparison[0] if store_comparison else None,
"preferred_retailer": preferred_retailer
}
def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]:
remaining = amount_usd / 1.06
units = 0.0
t1 = ZIM_CONTEXT["zesa_step_1"]
cost_t1 = t1["limit"] * t1["rate"]
if remaining > cost_t1:
units += t1["limit"]
remaining -= cost_t1
t2 = ZIM_CONTEXT["zesa_step_2"]
cost_t2 = t2["limit"] * t2["rate"]
if remaining > cost_t2:
units += t2["limit"]
remaining -= cost_t2
units += remaining / ZIM_CONTEXT["zesa_step_3"]["rate"]
else:
units += remaining / t2["rate"]
else:
units += remaining / t1["rate"]
return {
"amount_usd": float(amount_usd),
"est_units_kwh": float(round(units, 1))
}
# =========================
# 3. Gemini Helpers (Vernacular & Intelligence)
# =========================
def gemini_detect_intent(transcript: str) -> Dict[str, Any]:
if not _gemini_client: return {"actionable": False}
PROMPT = """
Analyze transcript. Return STRICT JSON.
Classify intent:
- CASUAL_CHAT: Greetings, "hi".
- SHOPPING_BASKET: Looking for prices, products, "cheapest X".
- UTILITY_CALC: Electricity/ZESA questions.
- STORE_DECISION: "Where should I buy?", "Which store is cheapest?".
- EVENT_PLANNING: "Plan a braai", "Wedding list", "Dinner for 5" (Implicit lists).
Extract:
- items: list of specific products found. **TRANSLATE ALL ITEMS TO ENGLISH** (e.g. 'Hupfu' -> 'Maize Meal').
- utility_amount: number
- store_preference: if a specific store is named (e.g. "at OK Mart").
- is_event_planning: boolean (true if user asks to plan an event but lists no items).
- language: Detected user language (e.g., "Shona", "Ndebele", "English").
JSON Schema:
{
"actionable": boolean,
"intent": "string",
"items": ["string"],
"utility_amount": number,
"store_preference": "string",
"is_event_planning": boolean,
"language": "string"
}
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT + "\nTranscript: " + transcript,
config=types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT", "language": "English"})
except Exception as e:
logger.error(f"Intent Detect Error: {e}")
return {"actionable": False, "intent": "CASUAL_CHAT", "language": "English"}
def gemini_explode_concept(transcript: str) -> List[str]:
"""
Converts a concept ("Braai for 10") into a concrete list ("Wors", "Charcoal").
"""
if not _gemini_client: return []
PROMPT = f"""
User wants to plan an event: "{transcript}".
Generate a STRICT list of 10-15 essential Zimbabwean shopping items for this.
Use English terms for database lookup (e.g. 'Maize Meal', 'Cooking Oil').
Return ONLY a JSON list of strings.
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT,
config=types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, [])
except Exception as e:
logger.error(f"Explode Concept Error: {e}")
return []
def gemini_analyze_image(image_b64: str, caption: str = "") -> Dict[str, Any]:
if not _gemini_client: return {"error": "AI Offline"}
PROMPT = f"""
Analyze this image. Context: {caption}
1. SHOPPING LIST? -> Extract items.
2. SINGLE PRODUCT? -> Extract BRAND + NAME (e.g. "Pepsi 500ml").
3. MEAL/DISH? -> Identify dish + ingredients.
4. IRRELEVANT? -> Return type "IRRELEVANT".
Return STRICT JSON:
{{
"type": "LIST" | "PRODUCT" | "MEAL" | "IRRELEVANT",
"items": ["item1"],
"description": "Short description"
}}
"""
try:
image_bytes = base64.b64decode(image_b64)
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[
PROMPT,
types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")
],
config=types.GenerateContentConfig(response_mime_type="application/json")
)
result = _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": []})
return result
except Exception as e:
logger.error(f"Vision Error: {e}")
return {"type": "IRRELEVANT", "items": []}
def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, chat_history: str = "") -> str:
if not _gemini_client: return "I'm having trouble connecting to my brain right now."
context_str = f"RECENT CHAT HISTORY (Last 6 messages):\n{chat_history}\n" if chat_history else ""
context_str += f"ZIMBABWE CONTEXT: Fuel={ZIM_CONTEXT['fuel_petrol']}, ZESA Rate={ZIM_CONTEXT['zesa_step_1']['rate']}\n"
if analyst_data:
context_str += f"ANALYST DATA: {json.dumps(analyst_data, default=str)}\n"
language = intent.get("language", "English")
PROMPT = f"""
You are Jessica, Pricelyst's Shopping Advisor (Zimbabwe).
Role: Intelligent Shopping Companion.
Goal: Shortest path to value. Complete Transparency.
INPUT: "{transcript}"
USER LANGUAGE: {language}
INTENT: {intent.get('intent')}
CONTEXT:
{context_str}
LOGIC RULES:
1. **LANGUAGE**: Reply in **{language}**. If Shona, use Shona. If English, use English.
2. **BASKET COMPARISON**:
- If `market_matrix` has multiple stores, compare totals.
- "Spar is **$6.95**, OK Mart is **$4.00** (but missing Oil)."
3. **BRAND SUBSTITUTES (Phrasing)**:
- If `is_substitute` is TRUE for an item, say:
"I couldn't find **[Query]**, but the **nearest match is** **[Found]** ($Price)."
4. **SINGLE ITEMS**:
- Best price first, then others.
5. **CASUAL**:
- Reset if user says "Hi".
TONE: Helpful, direct, Zimbabwean. Use Markdown.
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT
)
return resp.text
except Exception as e:
logger.error(f"Chat Gen Error: {e}")
return "I checked the prices, but I'm having trouble displaying them right now."
def gemini_generate_4step_plan(transcript: str, analyst_result: Dict) -> str:
if not _gemini_client: return "# Error\nAI Offline."
PROMPT = f"""
Generate a formatted Markdown Shopping Plan.
USER REQUEST: "{transcript}"
DATA: {json.dumps(analyst_result, indent=2, default=str)}
CRITICAL INSTRUCTION:
For items in 'global_missing', you MUST provide a Realistic USD Estimate (e.g. Chicken ~$6.00).
Do not leave them as "Unknown".
SECTIONS:
1. **In Our Catalogue ✅**
(Markdown Table: | Item | Retailer | Price (USD) |)
2. **Not in Catalogue (Estimates) 😔**
(Markdown Table: | Item | Estimated Price (USD) |)
*Fill in estimated prices for missing items based on Zimbabwe market knowledge.*
3. **Totals 💰**
- Confirmed Total (Catalogue)
- Estimated Total (Missing Items)
- **Grand Total Estimate**
4. **Ideas & Tips 💡**
- 3 Creative ideas based on the specific event/meal (e.g. Braai tips, Cooking hacks).
Tone: Warm, Professional, Zimbabwean.
"""
try:
resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT)
return resp.text
except Exception as e:
return "# Error\nCould not generate plan."
# =========================
# 4. Endpoints
# =========================
@app.get("/health")
def health():
df = get_market_index()
return jsonify({
"ok": True,
"offers_indexed": len(df),
"api_source": PRICE_API_BASE,
"persona": "Jessica v3.1 (Babel Fish)"
})
@app.post("/chat")
def chat():
body = request.get_json(silent=True) or {}
msg = body.get("message", "")
pid = body.get("profile_id")
if not pid: return jsonify({"ok": False, "error": "Missing profile_id"}), 400
# History
history_str = ""
if db:
try:
docs = db.collection("pricelyst_profiles").document(pid).collection("chat_logs") \
.order_by("ts", direction=firestore.Query.DESCENDING).limit(6).stream()
msgs = [f"User: {d.to_dict().get('message')}\nJessica: {d.to_dict().get('response')}" for d in docs]
if msgs: history_str = "\n".join(reversed(msgs))
except: pass
# Intent
intent_data = gemini_detect_intent(msg)
intent_type = intent_data.get("intent", "CASUAL_CHAT")
items = intent_data.get("items", [])
store_pref = intent_data.get("store_preference")
analyst_data = {}
if items or intent_type in ["SHOPPING_BASKET", "STORE_DECISION", "TRUST_CHECK"]:
analyst_data = calculate_basket_optimization(items, preferred_retailer=store_pref)
elif intent_type == "UTILITY_CALC":
amount = intent_data.get("utility_amount", 20)
analyst_data = calculate_zesa_units(amount)
reply = gemini_chat_response(msg, intent_data, analyst_data, history_str)
if db:
db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({
"message": msg,
"response": reply,
"intent": intent_data,
"ts": datetime.now(timezone.utc).isoformat()
})
return jsonify({"ok": True, "data": {"message": reply, "analyst_debug": analyst_data if items else None}})
@app.post("/api/analyze-image")
def analyze_image():
body = request.get_json(silent=True) or {}
image_b64 = body.get("image_data")
caption = body.get("caption", "")
pid = body.get("profile_id")
if not image_b64 or not pid: return jsonify({"ok": False}), 400
vision_result = gemini_analyze_image(image_b64, caption)
img_type = vision_result.get("type", "IRRELEVANT")
items = vision_result.get("items", [])
description = vision_result.get("description", "an image")
# Fallback for empty products
if (img_type in ["PRODUCT", "MEAL"]) and not items and description:
items = [description]
response_text = ""
analyst_data = {}
if img_type == "IRRELEVANT" and not items:
prompt = f"User uploaded photo of {description}. Compliment it if appropriate, then explain you are a shopping bot."
response_text = gemini_chat_response(prompt, {"intent": "CASUAL_CHAT"}, {}, "")
elif items:
analyst_data = calculate_basket_optimization(items)
sim_msg = ""
if img_type == "MEAL": sim_msg = f"I want to cook {description}. Cost of ingredients: {', '.join(items)}?"
elif img_type == "LIST": sim_msg = f"Price of list: {', '.join(items)}?"
else: sim_msg = f"Cheapest price for {', '.join(items)}?"
response_text = gemini_chat_response(sim_msg, {"intent": "STORE_DECISION"}, analyst_data, "")
else:
response_text = "I couldn't identify the product. Could you type the name?"
return jsonify({
"ok": True,
"image_type": img_type,
"items_identified": items,
"message": response_text,
"analyst_data": analyst_data
})
@app.post("/api/call-briefing")
def call_briefing():
"""
Injects INTELLIGENT Market Data into the Voice Bot's context.
Includes: Staples Index, ZESA/Fuel, Top 60 Catalogue.
"""
body = request.get_json(silent=True) or {}
pid = body.get("profile_id")
username = body.get("username", "Friend")
if not pid: return jsonify({"ok": False}), 400
# 1. Memory Profile
prof = {}
if db:
ref = db.collection("pricelyst_profiles").document(pid)
doc = ref.get()
if doc.exists: prof = doc.to_dict()
else: ref.set({"created_at": datetime.now(timezone.utc).isoformat()})
if username != "Friend" and username != prof.get("username"):
if db: db.collection("pricelyst_profiles").document(pid).set({"username": username}, merge=True)
# 2. Market Intelligence Generation
df = get_market_index()
market_intel = ""
# A. ZESA & Fuel
zesa_10 = calculate_zesa_units(10.0)
zesa_20 = calculate_zesa_units(20.0)
context_section = f"""
[CRITICAL CONTEXT - ZIMBABWE]
FUEL: Petrol=${ZIM_CONTEXT['fuel_petrol']}, Diesel=${ZIM_CONTEXT['fuel_diesel']}
BREAD: ~${ZIM_CONTEXT['bread_avg']}
ZESA (Electricity): $10 = {zesa_10['est_units_kwh']}u, $20 = {zesa_20['est_units_kwh']}u
"""
# B. Staples Index
staples = ["Cooking Oil", "Maize Meal", "Sugar", "Rice"]
staple_summary = []
if not df.empty:
for s in staples:
hits = search_products_deep(df[df['is_offer']==True], s, limit=5)
if not hits.empty:
cheapest = hits.sort_values('price').iloc[0]
staple_summary.append(f"- {s}: ${cheapest['price']} @ {cheapest['retailer']}")
staples_section = "\n[STAPLES - LOWEST]\n" + "\n".join(staple_summary)
# C. Top 60 Catalogue
catalogue_lines = []
if not df.empty:
top_items = df[df['is_offer']==True].sort_values('views', ascending=False).drop_duplicates('product_name').head(60)
for _, r in top_items.iterrows():
p_name = r['product_name']
all_offers = df[(df['product_name'] == p_name) & df['is_offer']]
prices_str = ", ".join([f"${o['price']} ({o['retailer']})" for _, o in all_offers.iterrows()])
catalogue_lines.append(f"- {p_name}: {prices_str}")
catalogue_section = "\n[CATALOGUE - TOP 60]\n" + "\n".join(catalogue_lines)
return jsonify({
"ok": True,
"username": username,
"memory_summary": prof.get("memory_summary", ""),
"kpi_snapshot": context_section + staples_section + catalogue_section
})
@app.post("/api/log-call-usage")
def log_call_usage():
"""
Post-Call Orchestrator.
v3.1: Handles Concept Explosion & Plan Generation.
"""
body = request.get_json(silent=True) or {}
pid = body.get("profile_id")
transcript = body.get("transcript", "")
if not pid: return jsonify({"ok": False}), 400
# 1. Update Long-Term Memory
if len(transcript) > 20 and db:
try:
curr_mem = db.collection("pricelyst_profiles").document(pid).get().to_dict().get("memory_summary", "")
mem_prompt = f"Update user memory (budget, family size) based on: {transcript}\nOLD: {curr_mem}"
mem_resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=mem_prompt)
db.collection("pricelyst_profiles").document(pid).set({"memory_summary": mem_resp.text}, merge=True)
except: pass
# 2. Plan Generation Logic
intent_data = gemini_detect_intent(transcript)
plan_data = {}
# Check if ACTIONABLE (Shopping or Event)
if intent_data.get("actionable"):
target_items = intent_data.get("items", [])
# LOGIC: If Event Planning + No specific items -> EXPLODE CONCEPT
if intent_data.get("is_event_planning") and not target_items:
logger.info("💥 Exploding Concept for Event...")
target_items = gemini_explode_concept(transcript)
if target_items:
analyst_result = calculate_basket_optimization(target_items)
# v3.1: Generate Plan with Estimates & Creative Tips
md_content = gemini_generate_4step_plan(transcript, analyst_result)
plan_data = {
"is_actionable": True,
"title": f"Plan ({datetime.now().strftime('%d %b')})",
"markdown_content": md_content,
"items": target_items,
"created_at": datetime.now(timezone.utc).isoformat()
}
if db:
doc_ref = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document()
plan_data["id"] = doc_ref.id
doc_ref.set(plan_data)
if db:
db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({
"transcript": transcript,
"intent": intent_data,
"plan_generated": bool(plan_data),
"ts": datetime.now(timezone.utc).isoformat()
})
return jsonify({
"ok": True,
"shopping_plan": plan_data if plan_data.get("is_actionable") else None
})
@app.get("/api/shopping-plans")
def list_plans():
pid = request.args.get("profile_id")
if not pid or not db: return jsonify({"ok": False}), 400
try:
docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \
.order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream()
return jsonify({"ok": True, "plans": [{"id": d.id, **d.to_dict()} for d in docs]})
except: return jsonify({"ok": False}), 500
@app.delete("/api/shopping-plans/<plan_id>")
def delete_plan(plan_id):
pid = request.args.get("profile_id")
if not pid or not db: return jsonify({"ok": False}), 400
try:
db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document(plan_id).delete()
return jsonify({"ok": True})
except: return jsonify({"ok": False}), 500
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
try: get_market_index(force_refresh=True)
except: pass
app.run(host="0.0.0.0", port=port)