PriceLystAI-API / main.py
rairo's picture
Update main.py
9e7ded1 verified
raw
history blame
22.1 kB
"""
main.py — Pricelyst Shopping Advisor (Analyst Edition - Full Context)
✅ Flask API
✅ Firebase Admin Persistence
✅ Gemini via google-genai SDK (Robust)
✅ "Analyst Engine": Python Math for Baskets, ZESA, & Fuel
✅ Ground Truth Data: Uses /api/v1/product-listing
✅ Jessica Context: Injects Top 60 Real Products into Voice Agent
✅ Intent Detection: Strict Casual vs Actionable separation
ENV VARS:
- GOOGLE_API_KEY=...
- FIREBASE='{"type":"service_account", ...}'
- PRICE_API_BASE=https://api.pricelyst.co.zw
- GEMINI_MODEL=gemini-2.0-flash
- PORT=5000
"""
import os
import re
import json
import time
import math
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import requests
import pandas as pd
from flask import Flask, request, jsonify
from flask_cors import CORS
# ––––– Logging –––––
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("pricelyst-advisor")
# ––––– Gemini SDK –––––
try:
from google import genai
from google.genai import types
except Exception as e:
genai = None
logger.error("google-genai not installed. pip install google-genai. Error=%s", e)
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.0-flash")
_gemini_client = None
if genai and GOOGLE_API_KEY:
try:
_gemini_client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Gemini client ready (model=%s).", GEMINI_MODEL)
except Exception as e:
logger.error("Failed to init Gemini client: %s", e)
# ––––– Firebase Admin –––––
import firebase_admin
from firebase_admin import credentials, firestore
FIREBASE_ENV = os.environ.get("FIREBASE", "")
def init_firestore_from_env() -> Optional[firestore.Client]:
if firebase_admin._apps:
return firestore.client()
if not FIREBASE_ENV:
logger.warning("FIREBASE env var missing. Persistence disabled.")
return None
try:
sa_info = json.loads(FIREBASE_ENV)
cred = credentials.Certificate(sa_info)
firebase_admin.initialize_app(cred)
logger.info("Firebase initialized.")
return firestore.client()
except Exception as e:
logger.critical("Failed to initialize Firebase: %s", e)
return None
db = init_firestore_from_env()
# ––––– External API –––––
PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/")
HTTP_TIMEOUT = 30
# ––––– Static Data (Zim Context) –––––
ZIM_UTILITIES = {
"fuel_petrol": 1.58,
"fuel_diesel": 1.65,
"gas_lpg": 2.00,
"bread": 1.00,
"zesa_step_1": {"limit": 50, "rate": 0.04},
"zesa_step_2": {"limit": 150, "rate": 0.09},
"zesa_step_3": {"limit": 9999, "rate": 0.14},
"zesa_levy": 0.06
}
# ––––– Cache –––––
PRODUCT_CACHE_TTL = 60 * 20 # 20 mins
_data_cache: Dict[str, Any] = {
"ts": 0,
"df": pd.DataFrame(),
"raw_count": 0
}
app = Flask(__name__)
CORS(app)
# =========================
# 1. ETL Layer (Ingestion)
# =========================
def _norm(s: Any) -> str:
if not s: return ""
return str(s).strip().lower()
def _coerce_price(v: Any) -> float:
try:
return float(v) if v is not None else 0.0
except:
return 0.0
def _safe_json_loads(s: str, fallback: Any):
try:
if "```json" in s:
s = s.split("```json")[1].split("```")[0]
elif "```" in s:
s = s.split("```")[0]
return json.loads(s)
except Exception as e:
logger.error(f"JSON Parse Error: {e}")
return fallback
def fetch_and_flatten_data() -> pd.DataFrame:
"""Fetches from /api/v1/product-listing and flattens into an analytical DF."""
all_products = []
page = 1
logger.info("ETL: Starting fetch from /api/v1/product-listing")
while True:
try:
url = f"{PRICE_API_BASE}/api/v1/product-listing"
r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT)
r.raise_for_status()
payload = r.json()
data = payload.get("data") or []
if not data: break
all_products.extend(data)
meta = payload
if page >= (meta.get("totalPages") or 99):
break
page += 1
except Exception as e:
logger.error(f"ETL Error on page {page}: {e}")
break
# Flattening Logic
rows = []
for p in all_products:
try:
p_id = p.get("id")
p_name = p.get("name") or "Unknown"
clean_name = _norm(p_name)
cat_obj = p.get("category") or {}
cat_name = cat_obj.get("name") or "General"
brand_obj = p.get("brand") or {}
brand_name = brand_obj.get("brand_name") or ""
views = int(p.get("view_count") or 0)
image = p.get("thumbnail") or p.get("image")
prices = p.get("prices") or []
if not prices:
rows.append({
"product_id": p_id,
"product_name": p_name,
"clean_name": clean_name,
"brand": brand_name,
"category": cat_name,
"retailer": "Listing",
"price": 0.0,
"views": views,
"image": image,
"is_offer": False
})
continue
for offer in prices:
retailer = offer.get("retailer") or {}
r_name = retailer.get("name") or "Unknown Store"
price_val = _coerce_price(offer.get("price"))
if price_val > 0:
rows.append({
"product_id": p_id,
"product_name": p_name,
"clean_name": clean_name,
"brand": brand_name,
"category": cat_name,
"retailer": r_name,
"price": price_val,
"views": views,
"image": image,
"is_offer": True
})
except:
continue
df = pd.DataFrame(rows)
logger.info(f"ETL: Flattened into {len(df)} rows.")
return df
def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
global _data_cache
if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL):
logger.info("ETL: Refreshing Market Index...")
df = fetch_and_flatten_data()
_data_cache["df"] = df
_data_cache["ts"] = time.time()
_data_cache["raw_count"] = len(df)
return _data_cache["df"]
# =========================
# 2. Analyst Engine (Math Logic)
# =========================
def search_products_fuzzy(df: pd.DataFrame, query: str, limit: int = 10) -> pd.DataFrame:
if df.empty or not query: return df
q_norm = _norm(query)
# 1. Broad Filter (Contains)
mask_name = df['clean_name'].str.contains(q_norm, regex=False)
matches = df[mask_name].copy()
# 2. If no exact contains, try token overlap
if matches.empty:
q_tokens = set(q_norm.split())
def token_score(text):
if not isinstance(text, str): return 0
text_tokens = set(text.split())
if not text_tokens: return 0
intersection = q_tokens.intersection(text_tokens)
return len(intersection)
df_scored = df.copy()
df_scored['score'] = df_scored['clean_name'].apply(token_score)
matches = df_scored[df_scored['score'] > 0]
if matches.empty:
return matches
# 3. Rank by Popularity (Views) + Price
matches = matches.sort_values(by=['views', 'price'], ascending=[False, True])
return matches.head(limit)
def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]:
"""
Determines the best store for a list of items.
"""
df = get_market_index()
if df.empty:
logger.warning("Basket Engine: DF is empty.")
return {"actionable": False, "error": "No data"}
logger.info(f"Basket Engine: Optimizing for {len(item_names)} items: {item_names}")
found_items = []
missing_global = []
# 1. Resolve Items to Real Products
for item in item_names:
hits = search_products_fuzzy(df[df['is_offer']==True], item, limit=5)
if hits.empty:
missing_global.append(item)
continue
# Pick best match (First one is sorted by Views/Price)
best_prod = hits.iloc[0]
found_items.append({
"query": item,
"product_id": best_prod['product_id'],
"name": best_prod['product_name']
})
if not found_items:
logger.info("Basket Engine: No items matched in DB.")
return {"actionable": False, "missing": missing_global}
# 2. Calculate Totals Per Retailer
target_pids = [x['product_id'] for x in found_items]
relevant_offers = df[df['product_id'].isin(target_pids) & df['is_offer']]
retailer_stats = []
all_retailers = relevant_offers['retailer'].unique()
for retailer in all_retailers:
r_df = relevant_offers[relevant_offers['retailer'] == retailer]
found_count = len(r_df)
total_price = r_df['price'].sum()
# Identify misses
retailer_pids = r_df['product_id'].tolist()
found_names = [x['name'] for x in found_items if x['product_id'] in retailer_pids]
retailer_stats.append({
"retailer": retailer,
"total_price": float(total_price),
"item_count": found_count,
"coverage_percent": (found_count / len(found_items)) * 100,
"found_items": found_names
})
# 3. Sort: Coverage Desc, Price Asc
retailer_stats.sort(key=lambda x: (-x['coverage_percent'], x['total_price']))
if not retailer_stats:
return {"actionable": False}
best_option = retailer_stats[0]
logger.info(f"Basket Engine: Best Store = {best_option['retailer']} (${best_option['total_price']})")
return {
"actionable": True,
"basket_items": [x['name'] for x in found_items],
"global_missing": missing_global,
"best_store": best_option,
"all_stores": retailer_stats[:3]
}
def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]:
remaining = amount_usd / 1.06 # Remove 6% levy approx
units = 0.0
breakdown = []
t1 = ZIM_UTILITIES["zesa_step_1"]
cost_t1 = t1["limit"] * t1["rate"]
if remaining > cost_t1:
units += t1["limit"]
remaining -= cost_t1
breakdown.append(f"First {t1['limit']}u @ ${t1['rate']}")
t2 = ZIM_UTILITIES["zesa_step_2"]
cost_t2 = t2["limit"] * t2["rate"]
if remaining > cost_t2:
units += t2["limit"]
remaining -= cost_t2
breakdown.append(f"Next {t2['limit']}u @ ${t2['rate']}")
t3 = ZIM_UTILITIES["zesa_step_3"]
bought = remaining / t3["rate"]
units += bought
breakdown.append(f"Balance ${(remaining + cost_t1 + cost_t2):.2f} -> {bought:.1f}u @ ${t3['rate']}")
else:
bought = remaining / t2["rate"]
units += bought
breakdown.append(f"Balance -> {bought:.1f}u @ ${t2['rate']}")
else:
bought = remaining / t1["rate"]
units += bought
breakdown.append(f"All {bought:.1f}u @ ${t1['rate']}")
return {
"amount_usd": amount_usd,
"est_units_kwh": round(units, 1),
"breakdown": breakdown
}
# =========================
# 3. Gemini Helpers (Strict)
# =========================
def gemini_detect_intent(transcript: str) -> Dict[str, Any]:
"""
Classifies if the conversation needs an Analyst action.
"""
if not _gemini_client: return {"actionable": False}
PROMPT = """
Analyze this transcript. Return STRICT JSON.
Is the user asking for shopping help (prices, basket, store advice, ZESA/Fuel)?
Output Schema:
{
"actionable": boolean,
"intent": "SHOPPING_BASKET" | "UTILITY_CALC" | "PRODUCT_SEARCH" | "CASUAL_CHAT",
"items": ["item1", "item2"] (if applicable),
"utility_amount": number (if applicable for ZESA/Fuel)
}
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT + "\nTranscript: " + transcript,
config=types.GenerateContentConfig(response_mime_type="application/json")
)
return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT"})
except Exception as e:
logger.error(f"Intent Detect Error: {e}")
return {"actionable": False, "intent": "CASUAL_CHAT"}
def gemini_chat_response(transcript: str, analyst_data: Dict) -> str:
if not _gemini_client: return "System offline."
PROMPT = f"""
You are Jessica, Pricelyst Analyst.
User asked: "{transcript}"
DATA (Use this strictly):
{json.dumps(analyst_data, indent=2)}
If 'actionable' is true, summarize the Best Store and Total Cost.
If ZESA data is present, give the units estimate.
Keep it short, helpful, and Zimbabwean.
"""
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=PROMPT
)
return resp.text
except:
return "I have the data but couldn't summarize it."
# =========================
# 4. Endpoints
# =========================
@app.get("/health")
def health():
df = get_market_index()
return jsonify({
"ok": True,
"offers_indexed": len(df),
"api_source": PRICE_API_BASE
})
@app.post("/chat")
def chat():
"""Text Chat Interface."""
body = request.get_json(silent=True) or {}
msg = body.get("message", "")
pid = body.get("profile_id")
if not pid: return jsonify({"ok": False}), 400
# 1. Detect Intent
intent_data = gemini_detect_intent(msg)
analyst_data = {}
# 2. Run Analyst (if actionable)
if intent_data.get("actionable"):
if intent_data["intent"] == "SHOPPING_BASKET" and intent_data.get("items"):
analyst_data = calculate_basket_optimization(intent_data["items"])
elif intent_data["intent"] == "UTILITY_CALC":
analyst_data = calculate_zesa_units(intent_data.get("utility_amount", 20))
elif intent_data["intent"] == "PRODUCT_SEARCH" and intent_data.get("items"):
# Reuse basket logic for single item search to get best store
analyst_data = calculate_basket_optimization(intent_data["items"])
# 3. Generate Reply
reply = gemini_chat_response(msg, analyst_data)
# Log
if db:
db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({
"message": msg,
"response_text": reply,
"intent": intent_data,
"ts": datetime.now(timezone.utc).isoformat()
})
return jsonify({"ok": True, "data": {"message": reply, "analyst": analyst_data}})
@app.post("/api/call-briefing")
def call_briefing():
"""
Injects Memory + Top Products Catalogue for the Voice Agent.
"""
body = request.get_json(silent=True) or {}
pid = body.get("profile_id")
username = body.get("username")
if not pid: return jsonify({"ok": False}), 400
prof = {}
if db:
ref = db.collection("pricelyst_profiles").document(pid)
doc = ref.get()
if doc.exists:
prof = doc.to_dict()
else:
ref.set({"created_at": datetime.now(timezone.utc).isoformat()})
if username and username != prof.get("username"):
if db: db.collection("pricelyst_profiles").document(pid).set({"username": username}, merge=True)
# --- Generate Mini-Catalogue (Top 60 popular items) ---
df = get_market_index()
top_products_str = ""
if not df.empty:
# Sort by views desc, take top 60 unique product names
top_offers = df[df['is_offer']].sort_values('views', ascending=False).drop_duplicates('product_name').head(60)
# Format: "Name ($AvgPrice)"
items_list = []
for _, r in top_offers.iterrows():
items_list.append(f"{r['product_name']} (~${r['price']:.2f})")
top_products_str = ", ".join(items_list)
# Payload for ElevenLabs (Data Variables Only)
kpi_snapshot = {
"market_rates": ZIM_UTILITIES,
"popular_products_catalogue": top_products_str
}
return jsonify({
"ok": True,
"memory_summary": prof.get("memory_summary", ""),
"kpi_snapshot": json.dumps(kpi_snapshot)
})
@app.post("/api/log-call-usage")
def log_call_usage():
"""
Post-Call Processor.
1. Intent Check (Strict).
2. Analyst Optimization.
3. Plan Gen & Persistence.
"""
body = request.get_json(silent=True) or {}
pid = body.get("profile_id")
transcript = body.get("transcript", "")
if not pid: return jsonify({"ok": False}), 400
logger.info(f"Log Call: Processing {pid}. Transcript Len: {len(transcript)}")
# 1. Update Memory (Async-like)
if len(transcript) > 20 and db:
try:
curr_mem = db.collection("pricelyst_profiles").document(pid).get().to_dict().get("memory_summary", "")
mem_prompt = f"Update user memory (concise) with new details:\nOLD: {curr_mem}\nTRANSCRIPT: {transcript}"
mem_resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=mem_prompt)
db.collection("pricelyst_profiles").document(pid).set({"memory_summary": mem_resp.text}, merge=True)
except Exception as e:
logger.error(f"Memory Update Failed: {e}")
# 2. Intent Detection (The Gatekeeper)
intent_data = gemini_detect_intent(transcript)
logger.info(f"Log Call: Intent detected: {intent_data.get('intent')}")
plan_data = {}
# 3. Actionable Logic
if intent_data.get("actionable"):
# Handle Shopping List
if intent_data.get("items"):
analyst_result = calculate_basket_optimization(intent_data["items"])
if analyst_result.get("actionable"):
best = analyst_result["best_store"]
# Markdown Generation
md = f"# Shopping Plan\n\n"
md += f"**Recommended Store:** {best['retailer']}\n"
md += f"**Estimated Total:** ${best['total_price']:.2f}\n\n"
md += "## Your Basket\n\n"
md += "| Item | Found? |\n|---|---|\n"
for it in analyst_result["basket_items"]:
status = "✅ In Stock" if it in best["found_items"] else "❌ Not Found"
md += f"| {it} | {status} |\n"
if analyst_result["global_missing"]:
md += "\n### Missing Items (Estimate Required)\n"
for m in analyst_result["global_missing"]:
md += f"- {m}\n"
plan_data = {
"is_actionable": True,
"title": f"Plan: {best['retailer']} (${best['total_price']:.2f})",
"markdown_content": md,
"items": intent_data["items"],
"created_at": datetime.now(timezone.utc).isoformat()
}
# Persist Plan
if db:
doc_ref = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document()
plan_data["id"] = doc_ref.id
doc_ref.set(plan_data)
logger.info(f"Log Call: Plan Saved {doc_ref.id}")
# 4. Log Call
if db:
db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({
"transcript": transcript,
"intent_data": intent_data,
"plan_generated": bool(plan_data),
"ts": datetime.now(timezone.utc).isoformat()
})
return jsonify({
"ok": True,
"shopping_plan": plan_data if plan_data.get("is_actionable") else None
})
# ––––– CRUD: Shopping Plans –––––
@app.get("/api/shopping-plans")
def list_plans():
pid = request.args.get("profile_id")
if not pid or not db: return jsonify({"ok": False}), 400
try:
docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \
.order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream()
plans = [{"id": d.id, **d.to_dict()} for d in docs]
return jsonify({"ok": True, "plans": plans})
except:
return jsonify({"ok": False}), 500
@app.delete("/api/shopping-plans/<plan_id>")
def delete_plan(plan_id):
pid = request.args.get("profile_id")
if not pid or not db: return jsonify({"ok": False}), 400
try:
db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document(plan_id).delete()
return jsonify({"ok": True})
except:
return jsonify({"ok": False}), 500
# =========================
# Main
# =========================
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
# Pre-warm Cache
try:
get_market_index(force_refresh=True)
except:
pass
app.run(host="0.0.0.0", port=port)