PriceLystAI-API / main.py
rairo's picture
Update main.py
69406fb verified
raw
history blame
29.1 kB
"""
main.py — Pricelyst Shopping Advisor (Jessica Edition 2026 - Upgrade v2.5)
✅ Fixed: "Basket Regression" - AI now returns prices IMMEDIATELY.
✅ Fixed: "Bluffing" - AI explicitly states if item is found or missing.
✅ Optimization: Removed "Add to list" chatter. Shortest path to value.
✅ "Analyst Engine": Enhanced Basket Math, Category Context, ZESA Logic.
✅ "Visual Engine": Lists, Products, & Meal-to-Recipe recognition.
✅ Memory Logic: Short-Term Sliding Window (Last 6 messages).
ENV VARS:
- GOOGLE_API_KEY=...
- FIREBASE='{"type":"service_account", ...}'
- PRICE_API_BASE=https://api.pricelyst.co.zw
- GEMINI_MODEL=gemini-2.5-flash
- PORT=5000
"""
import os
import re
import json
import time
import math
import logging
import base64
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
import requests
import pandas as pd
from flask import Flask, request, jsonify
from flask_cors import CORS
# ––––– Logging –––––
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("pricelyst-advisor")
# ––––– Gemini SDK –––––
try:
from google import genai
from google.genai import types
except Exception as e:
genai = None
logger.error("google-genai not installed. pip install google-genai. Error=%s", e)
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
_gemini_client = None
if genai and GOOGLE_API_KEY:
try:
_gemini_client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Gemini client ready (model=%s).", GEMINI_MODEL)
except Exception as e:
logger.error("Failed to init Gemini client: %s", e)
# ––––– Firebase Admin –––––
import firebase_admin
from firebase_admin import credentials, firestore
FIREBASE_ENV = os.environ.get("FIREBASE", "")
def init_firestore_from_env() -> Optional[firestore.Client]:
if firebase_admin._apps:
return firestore.client()
if not FIREBASE_ENV:
logger.warning("FIREBASE env var missing. Persistence disabled.")
return None
try:
sa_info = json.loads(FIREBASE_ENV)
cred = credentials.Certificate(sa_info)
firebase_admin.initialize_app(cred)
logger.info("Firebase initialized.")
return firestore.client()
except Exception as e:
logger.critical("Failed to initialize Firebase: %s", e)
return None
db = init_firestore_from_env()
# ––––– External API –––––
PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/")
HTTP_TIMEOUT = 30
# ––––– Static Data (Zim Context) –––––
ZIM_CONTEXT = {
"fuel_petrol": 1.58,
"fuel_diesel": 1.65,
"gas_lpg": 2.00,
"bread_avg": 1.00,
"zesa_step_1": {"limit": 50, "rate": 0.04},
"zesa_step_2": {"limit": 150, "rate": 0.09},
"zesa_step_3": {"limit": 9999, "rate": 0.14},
"zesa_levy": 0.06
}
# ––––– Cache –––––
PRODUCT_CACHE_TTL = 60 * 20 # 20 mins
_data_cache: Dict[str, Any] = {
"ts": 0,
"df": pd.DataFrame(),
"raw_count": 0
}
app = Flask(__name__)
CORS(app)
# =========================
# 1. ETL Layer (Ingestion)
# =========================
def _norm(s: Any) -> str:
if not s: return ""
return str(s).strip().lower()
def _coerce_price(v: Any) -> float:
try:
return float(v) if v is not None else 0.0
except:
return 0.0
def _safe_json_loads(s: str, fallback: Any):
try:
if "```json" in s:
s = s.split("```json")[1].split("```")[0]
elif "```" in s:
s = s.split("```")[0]
return json.loads(s)
except Exception as e:
logger.error(f"JSON Parse Error: {e}")
return fallback
def fetch_and_flatten_data() -> pd.DataFrame:
"""Fetches from /api/v1/product-listing and flattens into an analytical DF."""
all_products = []
page = 1
logger.info("ETL: Starting fetch from /api/v1/product-listing")
while True:
try:
url = f"{PRICE_API_BASE}/api/v1/product-listing"
r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT)
r.raise_for_status()
payload = r.json()
data = payload.get("data") or []
if not data: break
all_products.extend(data)
meta = payload
if page >= (meta.get("totalPages") or 99):
break
page += 1
except Exception as e:
logger.error(f"ETL Error on page {page}: {e}")
break
rows = []
for p in all_products:
try:
p_id = int(p.get("id") or 0)
p_name = str(p.get("name") or "Unknown")
clean_name = _norm(p_name)
cat_obj = p.get("category") or {}
cat_name = str(cat_obj.get("name") or "General")
brand_obj = p.get("brand") or {}
brand_name = str(brand_obj.get("brand_name") or "")
views = int(p.get("view_count") or 0)
image = str(p.get("thumbnail") or p.get("image") or "")
prices = p.get("prices") or []
if not prices:
rows.append({
"product_id": p_id,
"product_name": p_name,
"clean_name": clean_name,
"brand": brand_name,
"category": cat_name,
"retailer": "Listing",
"price": 0.0,
"views": views,
"image": image,
"is_offer": False
})
continue
for offer in prices:
retailer = offer.get("retailer") or {}
r_name = str(retailer.get("name") or "Unknown Store")
price_val = _coerce_price(offer.get("price"))
if price_val > 0:
rows.append({
"product_id": p_id,
"product_name": p_name,
"clean_name": clean_name,
"brand": brand_name,
"category": cat_name,
"retailer": r_name,
"price": price_val,
"views": views,
"image": image,
"is_offer": True
})
except:
continue
df = pd.DataFrame(rows)
logger.info(f"ETL: Flattened into {len(df)} rows.")
return df
def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
global _data_cache
if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL):
logger.info("ETL: Refreshing Market Index...")
df = fetch_and_flatten_data()
_data_cache["df"] = df
_data_cache["ts"] = time.time()
_data_cache["raw_count"] = len(df)
return _data_cache["df"]
# =========================
# 2. Analyst Engine (Math Logic)
# =========================
def search_products_fuzzy(df: pd.DataFrame, query: str, limit: int = 10) -> pd.DataFrame:
if df.empty or not query: return df
q_norm = _norm(query)
# 1. Contains
mask_name = df['clean_name'].str.contains(q_norm, regex=False)
matches = df[mask_name].copy()
# 2. Token overlap fallback
if matches.empty:
q_tokens = set(q_norm.split())
def token_score(text):
if not isinstance(text, str): return 0
text_tokens = set(text.split())
if not text_tokens: return 0
intersection = q_tokens.intersection(text_tokens)
return len(intersection)
df_scored = df.copy()
df_scored['score'] = df_scored['clean_name'].apply(token_score)
matches = df_scored[df_scored['score'] > 0]
if matches.empty: return matches
# 3. Sort by Views + Price
matches = matches.sort_values(by=['views', 'price'], ascending=[False, True])
return matches.head(limit)
def get_category_stats(df: pd.DataFrame, category_name: str) -> Dict[str, Any]:
if df.empty: return {}
cat_df = df[df['category'].str.lower().str.contains(category_name.lower()) & df['is_offer']]
if cat_df.empty:
cat_df = df[df['clean_name'].str.contains(category_name.lower()) & df['is_offer']]
if cat_df.empty: return {}
return {
"category": category_name,
"min_price": float(cat_df['price'].min()),
"max_price": float(cat_df['price'].max()),
"avg_price": float(cat_df['price'].mean()),
"sample_size": int(len(cat_df))
}
def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]:
df = get_market_index()
if df.empty:
return {"actionable": False, "error": "No data"}
found_items = []
missing_global = []
for item in item_names:
hits = search_products_fuzzy(df[df['is_offer']==True], item, limit=5)
if hits.empty:
missing_global.append(item)
continue
best_prod = hits.iloc[0]
cat_stats = get_category_stats(df, str(best_prod['category']))
found_items.append({
"query": str(item),
"product_id": int(best_prod['product_id']),
"name": str(best_prod['product_name']),
"category": str(best_prod['category']),
"retailer": str(best_prod['retailer']), # Added explicitly for prompt access
"price": float(best_prod['price']), # Added explicitly for prompt access
"category_stats": cat_stats
})
if not found_items:
return {
"actionable": True,
"basket_items": [],
"global_missing": missing_global,
"best_store": None,
"split_strategy": None
}
target_pids = [x['product_id'] for x in found_items]
relevant_offers = df[df['product_id'].isin(target_pids) & df['is_offer']]
retailer_stats = []
all_retailers = relevant_offers['retailer'].unique()
for retailer in all_retailers:
r_df = relevant_offers[relevant_offers['retailer'] == retailer]
found_count = len(r_df)
total_price = r_df['price'].sum()
retailer_pids = r_df['product_id'].tolist()
found_names = [x['name'] for x in found_items if x['product_id'] in retailer_pids]
retailer_stats.append({
"retailer": str(retailer),
"total_price": float(total_price),
"item_count": int(found_count),
"coverage_percent": float((found_count / len(found_items)) * 100),
"found_items": found_names
})
retailer_stats.sort(key=lambda x: (-x['coverage_percent'], x['total_price']))
best_single_store = retailer_stats[0] if retailer_stats else None
split_basket = []
split_total = 0.0
for item in found_items:
p_offers = relevant_offers[relevant_offers['product_id'] == item['product_id']]
if not p_offers.empty:
best_offer = p_offers.sort_values('price').iloc[0]
split_total += best_offer['price']
split_basket.append({
"item": item['name'],
"retailer": str(best_offer['retailer']),
"price": float(best_offer['price'])
})
split_strategy = {
"total_price": float(split_total),
"breakdown": split_basket,
"store_count": len(set(x['retailer'] for x in split_basket))
}
return {
"actionable": True,
"basket_items": [x['name'] for x in found_items],
"found_items_details": found_items,
"global_missing": missing_global,
"best_store": best_single_store,
"split_strategy": split_strategy,
"all_stores": retailer_stats[:3]
}
def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]:
remaining = amount_usd / 1.06
units = 0.0
breakdown = []
t1 = ZIM_CONTEXT["zesa_step_1"]
cost_t1 = t1["limit"] * t1["rate"]
if remaining > cost_t1:
units += t1["limit"]
remaining -= cost_t1
breakdown.append(f"First {t1['limit']}u @ ${t1['rate']}")
t2 = ZIM_CONTEXT["zesa_step_2"]
cost_t2 = t2["limit"] * t2["rate"]
if remaining > cost_t2:
units += t2["limit"]
remaining -= cost_t2
breakdown.append(f"Next {t2['limit']}u @ ${t2['rate']}")
t3 = ZIM_CONTEXT["zesa_step_3"]
bought = remaining / t3["rate"]
units += bought
breakdown.append(f"Balance -> {bought:.1f}u @ ${t3['rate']}")
else:
bought = remaining / t2["rate"]
units += bought
breakdown.append(f"Balance -> {bought:.1f}u @ ${t2['rate']}")
else:
bought = remaining / t1["rate"]
units += bought
breakdown.append(f"All {bought:.1f}u @ ${t1['rate']}")
return {
"amount_usd": float(amount_usd),
"est_units_kwh": float(round(units, 1)),
"breakdown": breakdown,
"note": "Includes approx 6% REA levy deduction."
}
# =========================
# 3. Gemini Helpers
# =========================
def gemini_detect_intent(transcript: str) -> Dict[str, Any]:
if not _gemini_client: return {"actionable": False}
PROMPT = """
Analyze transcript. Return STRICT JSON.
Classify intent:
- CASUAL_CHAT: Greetings, small talk, "hi", "thanks".
- SHOPPING_BASKET: Looking for prices, products, lists, or "cheapest X".
- UTILITY_CALC: Electricity/ZESA questions.
- STORE_DECISION: "Where should I buy?", "Which store is cheapest?".
- TRUST_CHECK: "Is this expensive?", "Is this a good deal?".
Extract:
- items: list of products found in the text.
- utility_amount: number
JSON Schema:
{
"actionable": boolean,
"intent": "string",
"items": ["string"],
"utility_amount": number
}
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT + "\nTranscript: " + transcript,
config=types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT"})
except Exception as e:
logger.error(f"Intent Detect Error: {e}")
return {"actionable": False, "intent": "CASUAL_CHAT"}
def gemini_analyze_image(image_b64: str, caption: str = "") -> Dict[str, Any]:
if not _gemini_client: return {"error": "AI Offline"}
PROMPT = f"""
Analyze this image. Context: {caption}
1. SHOPPING LIST? -> Extract items.
2. SINGLE PRODUCT? -> Extract the BRAND and PRODUCT NAME into 'items'. (e.g. "Pepsi 500ml")
3. MEAL/DISH? -> Identify the dish and ingredients.
4. IRRELEVANT (Pet, Person, Nature)? -> Return type "IRRELEVANT".
IMPORTANT: If type is 'PRODUCT', the 'items' list MUST contain the product name. Do not leave it empty.
Return STRICT JSON:
{{
"type": "LIST" | "PRODUCT" | "MEAL" | "IRRELEVANT",
"items": ["item1"],
"description": "Short description of what is seen"
}}
"""
try:
image_bytes = base64.b64decode(image_b64)
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[
PROMPT,
types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")
],
config=types.GenerateContentConfig(response_mime_type="application/json")
)
result = _safe_json_loads(resp.text, {"type": "IRRELEVANT", "items": []})
logger.info(f"🔮 VISION RAW: {json.dumps(result)}")
return result
except Exception as e:
logger.error(f"Vision Error: {e}")
return {"type": "IRRELEVANT", "items": []}
def gemini_chat_response(transcript: str, intent: Dict, analyst_data: Dict, chat_history: str = "") -> str:
if not _gemini_client: return "I'm having trouble connecting to my brain right now."
context_str = f"RECENT CHAT HISTORY (Last 6 messages):\n{chat_history}\n" if chat_history else ""
context_str += f"ZIMBABWE CONTEXT: Fuel={ZIM_CONTEXT['fuel_petrol']}, ZESA Rate={ZIM_CONTEXT['zesa_step_1']['rate']}\n"
if analyst_data:
context_str += f"ANALYST DATA (Prices/Availability): {json.dumps(analyst_data, default=str)}\n"
PROMPT = f"""
You are Jessica, Pricelyst's Shopping Advisor (Zimbabwe).
Role: Intelligent Shopping Companion.
Goal: Shortest path to value. Give answers, not promises.
INPUT: "{transcript}"
INTENT: {intent.get('intent')}
CONTEXT:
{context_str}
CRITICAL INSTRUCTIONS (Shortest Path Rule):
1. **CHECK ANALYST DATA FIRST**:
- If `ANALYST DATA` contains `found_items_details` or `split_strategy` with prices: **REPORT THEM IMMEDIATELY**.
- Say: "I found [Product] at [Retailer] for $[Price]."
- Do NOT say "I will add this to your list."
- Do NOT say "I will check for you." (You have already checked!)
2. **MISSING ITEMS**:
- If `global_missing` has items: Say "I checked, but we don't have [Item] in our current catalogue."
- Don't fake it. Be honest about catalogue gaps.
3. **CASUAL CHAT**:
- Only if no products are mentioned. "Makadii! How can I help?"
- Reset topic if user says "Hi" or changes subject.
TONE: Helpful, direct, Zimbabwean. Use Markdown for prices (e.g. **$3.50**).
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT
)
return resp.text
except Exception as e:
logger.error(f"Chat Gen Error: {e}")
return "I checked the prices, but I'm having trouble displaying them right now."
def gemini_generate_4step_plan(transcript: str, analyst_result: Dict) -> str:
if not _gemini_client: return "# Error\nAI Offline."
PROMPT = f"""
Generate a formatted Markdown Shopping Plan (Jessica Edition).
DATA: {json.dumps(analyst_result, indent=2, default=str)}
SECTIONS:
1. **In Our Catalogue ✅** (Table: Item | Store | Price)
2. **Not in Catalogue 😔** (Estimates)
3. **Recommendation 💡**
- "Best Single Store" vs "Split & Save".
4. **Budget Tips**
Make it look professional yet friendly.
"""
try:
resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=PROMPT)
return resp.text
except Exception as e:
return "# Error\nCould not generate plan."
# =========================
# 4. Endpoints
# =========================
@app.get("/health")
def health():
df = get_market_index()
return jsonify({
"ok": True,
"offers_indexed": len(df),
"api_source": PRICE_API_BASE,
"persona": "Jessica v2.5 (Immediate Price Check)"
})
@app.post("/chat")
def chat():
"""
Unified Text Chat Endpoint.
Uses SHORT-TERM SLIDING WINDOW memory only.
"""
body = request.get_json(silent=True) or {}
msg = body.get("message", "")
pid = body.get("profile_id")
if not pid: return jsonify({"ok": False, "error": "Missing profile_id"}), 400
# 1. Fetch Short-Term History (Sliding Window)
history_str = ""
if db:
try:
# Get last 6 messages
docs = db.collection("pricelyst_profiles").document(pid).collection("chat_logs") \
.order_by("ts", direction=firestore.Query.DESCENDING).limit(6).stream()
msgs = []
for d in docs:
data = d.to_dict()
msgs.append(f"User: {data.get('message')}\nJessica: {data.get('response')}")
if msgs:
history_str = "\n".join(reversed(msgs))
except Exception as e:
logger.error(f"History Fetch Error: {e}")
# 2. Intent Detection
intent_data = gemini_detect_intent(msg)
intent_type = intent_data.get("intent", "CASUAL_CHAT")
items = intent_data.get("items", [])
analyst_data = {}
# 3. Data Processing (The Analyst)
# Trigger Analyst if Items exist OR intent is specifically about shopping/decisions
if items or intent_type in ["SHOPPING_BASKET", "STORE_DECISION", "TRUST_CHECK"]:
analyst_data = calculate_basket_optimization(items)
elif intent_type == "UTILITY_CALC":
amount = intent_data.get("utility_amount", 20)
analyst_data = calculate_zesa_units(amount)
# 4. Response Generation (The Persona)
reply = gemini_chat_response(msg, intent_data, analyst_data, history_str)
# 5. Async Logging
if db:
db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({
"message": msg,
"response": reply,
"intent": intent_data,
"ts": datetime.now(timezone.utc).isoformat()
})
return jsonify({
"ok": True,
"data": {
"message": reply,
"analyst_debug": analyst_data if items else None
}
})
@app.post("/api/analyze-image")
def analyze_image():
"""
Handles Image -> List/Product/Meal -> Shopping Data
AUTO-RESOLVES intent with Context-Aware Simulation.
"""
body = request.get_json(silent=True) or {}
image_b64 = body.get("image_data")
caption = body.get("caption", "")
pid = body.get("profile_id")
if not image_b64 or not pid: return jsonify({"ok": False}), 400
# 1. Vision Analysis
vision_result = gemini_analyze_image(image_b64, caption)
img_type = vision_result.get("type", "IRRELEVANT")
items = vision_result.get("items", [])
description = vision_result.get("description", "an image")
# Fallback: If type is PRODUCT/MEAL but items is empty, try to use description as search item
if (img_type in ["PRODUCT", "MEAL"]) and not items and description:
items = [description]
logger.info(f"🔮 Fallback: Used description '{description}' as item.")
response_text = ""
analyst_data = {}
# 2. Logic Branching
if img_type == "IRRELEVANT" and not items:
# Graceful Rejection
prompt = f"User uploaded a photo of: {description}. If it is a pet/flower/view, compliment it warmly! Then effectively explain you are a shopping bot and can't price check that."
response_text = gemini_chat_response(prompt, {"intent": "CASUAL_CHAT"}, {}, "")
elif items:
# Run the Analyst Engine
analyst_data = calculate_basket_optimization(items)
# 3. DYNAMIC SIMULATED INTENT (Force immediate answer)
if img_type == "MEAL":
simulated_user_msg = f"I want to cook {description}. I need {', '.join(items)}. How much does it cost?"
intent_sim = {"intent": "SHOPPING_BASKET"}
elif img_type == "LIST":
simulated_user_msg = f"Here is my list: {', '.join(items)}. What are the prices?"
intent_sim = {"intent": "STORE_DECISION"}
else: # PRODUCT
simulated_user_msg = f"I see {description}. What is the price for {', '.join(items)}?"
intent_sim = {"intent": "STORE_DECISION"}
# Generate Response
response_text = gemini_chat_response(
simulated_user_msg,
intent_sim,
analyst_data,
chat_history=""
)
else:
response_text = "I couldn't quite identify the product in that image. Could you type the name for me?"
return jsonify({
"ok": True,
"image_type": img_type,
"items_identified": items,
"message": response_text,
"analyst_data": analyst_data
})
@app.post("/api/call-briefing")
def call_briefing():
"""
Injects LONG-TERM Memory + Context for Voice Bot.
"""
body = request.get_json(silent=True) or {}
pid = body.get("profile_id")
username = body.get("username")
if not pid: return jsonify({"ok": False}), 400
prof = {}
if db:
ref = db.collection("pricelyst_profiles").document(pid)
doc = ref.get()
if doc.exists:
prof = doc.to_dict()
else:
ref.set({"created_at": datetime.now(timezone.utc).isoformat()})
if username and username != prof.get("username"):
if db: db.collection("pricelyst_profiles").document(pid).set({"username": username}, merge=True)
# Mini-Catalogue
df = get_market_index()
catalogue_str = ""
if not df.empty:
top = df[df['is_offer']].sort_values('views', ascending=False).drop_duplicates('product_name').head(60)
lines = [f"{r['product_name']} (~${r['price']:.2f})" for _, r in top.iterrows()]
catalogue_str = ", ".join(lines)
kpi_snapshot = {
"market_rates": ZIM_CONTEXT,
"popular_products": catalogue_str
}
return jsonify({
"ok": True,
"memory_summary": prof.get("memory_summary", ""),
"kpi_snapshot": json.dumps(kpi_snapshot)
})
@app.post("/api/log-call-usage")
def log_call_usage():
"""
Post-Call Orchestrator.
Generates Plans & Updates Long-Term Memory.
"""
body = request.get_json(silent=True) or {}
pid = body.get("profile_id")
transcript = body.get("transcript", "")
if not pid: return jsonify({"ok": False}), 400
# 1. Update Long-Term Memory
if len(transcript) > 20 and db:
try:
curr_mem = db.collection("pricelyst_profiles").document(pid).get().to_dict().get("memory_summary", "")
mem_prompt = f"Update user memory (budget, family size, favorite stores) based on this transcript:\nOLD: {curr_mem}\nTRANSCRIPT: {transcript}"
mem_resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=mem_prompt)
db.collection("pricelyst_profiles").document(pid).set({"memory_summary": mem_resp.text}, merge=True)
except Exception as e:
logger.error(f"Memory Update Error: {e}")
# 2. Plan Generation
intent_data = gemini_detect_intent(transcript)
plan_data = {}
if intent_data.get("actionable") and intent_data.get("items"):
analyst_result = calculate_basket_optimization(intent_data["items"])
if analyst_result.get("actionable"):
md_content = gemini_generate_4step_plan(transcript, analyst_result)
plan_data = {
"is_actionable": True,
"title": f"Shopping Plan ({datetime.now().strftime('%d %b')})",
"markdown_content": md_content,
"items": intent_data["items"],
"created_at": datetime.now(timezone.utc).isoformat()
}
if db:
doc_ref = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document()
plan_data["id"] = doc_ref.id
doc_ref.set(plan_data)
if db:
db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({
"transcript": transcript,
"intent": intent_data,
"plan_generated": bool(plan_data),
"ts": datetime.now(timezone.utc).isoformat()
})
return jsonify({
"ok": True,
"shopping_plan": plan_data if plan_data.get("is_actionable") else None
})
@app.get("/api/shopping-plans")
def list_plans():
pid = request.args.get("profile_id")
if not pid or not db: return jsonify({"ok": False}), 400
try:
docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \
.order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream()
plans = [{"id": d.id, **d.to_dict()} for d in docs]
return jsonify({"ok": True, "plans": plans})
except:
return jsonify({"ok": False}), 500
@app.delete("/api/shopping-plans/<plan_id>")
def delete_plan(plan_id):
pid = request.args.get("profile_id")
if not pid or not db: return jsonify({"ok": False}), 400
try:
db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document(plan_id).delete()
return jsonify({"ok": True})
except:
return jsonify({"ok": False}), 500
# =========================
# Main
# =========================
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
try:
get_market_index(force_refresh=True)
except:
pass
app.run(host="0.0.0.0", port=port)