rairo commited on
Commit
ccbfc8e
·
verified ·
1 Parent(s): 3d25258

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +374 -877
main.py CHANGED
@@ -1,32 +1,29 @@
1
  """
2
- main.py — Pricelyst Shopping Advisor (Jessica Edition)
3
 
4
  ✅ Flask API
5
- ✅ Firebase Admin persistence (service account JSON via env var)
6
- ✅ Gemini via NEW google-genai SDK (text + multimodal + JSON Mode)
7
- Product intelligence from Pricelyst API
8
- Graceful conversational handling (Backwards Compatible)
9
- Call briefing (Zim Essentials Injection)
10
- ✅ Post-call Shopping Plan Generation (PDF-ready)
11
-
12
- ENV VARS YOU NEED:
13
  - GOOGLE_API_KEY=...
14
- - FIREBASE='{"type":"service_account", ...}' # full JSON string
15
- - PRICE_API_BASE=https://api.pricelyst.co.zw # optional
16
- - GEMINI_MODEL=gemini-2.0-flash # optional
17
- - PORT=5000 # optional
18
  """
19
 
20
  import os
21
  import re
22
  import json
23
  import time
24
- import math
25
- import uuid
26
  import base64
27
  import logging
28
  from datetime import datetime, timezone
29
- from typing import Any, Dict, List, Optional, Tuple
30
 
31
  import requests
32
  import pandas as pd
@@ -43,8 +40,6 @@ logger = logging.getLogger("pricelyst-advisor")
43
 
44
  # ––––– Gemini (NEW SDK) –––––
45
 
46
- # pip install google-genai
47
-
48
  try:
49
  from google import genai
50
  from google.genai import types
@@ -65,22 +60,19 @@ if genai and GOOGLE_API_KEY:
65
 
66
  # ––––– Firebase Admin –––––
67
 
68
- # pip install firebase-admin
69
-
70
  import firebase_admin
71
  from firebase_admin import credentials, firestore
72
 
73
  FIREBASE_ENV = os.environ.get("FIREBASE", "")
74
 
75
  def init_firestore_from_env() -> firestore.Client:
76
- # 1. Check if already initialized
77
  if firebase_admin._apps:
78
  return firestore.client()
79
 
80
- # 2. Check for Creds
81
  if not FIREBASE_ENV:
82
- logger.critical("FIREBASE env var missing. Persistence will fail.")
83
- raise RuntimeError("FIREBASE env var missing. Provide full service account JSON string.")
 
84
 
85
  try:
86
  sa_info = json.loads(FIREBASE_ENV)
@@ -90,18 +82,14 @@ def init_firestore_from_env() -> firestore.Client:
90
  return firestore.client()
91
  except Exception as e:
92
  logger.critical("Failed to initialize Firebase: %s", e)
93
- raise e
94
 
95
- try:
96
- db = init_firestore_from_env()
97
- except Exception as e:
98
- logger.error("DB Init failed: %s", e)
99
- db = None
100
 
101
  # ––––– External API (Pricelyst) –––––
102
 
103
  PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/")
104
- HTTP_TIMEOUT = 20
105
 
106
  # ––––– Flask –––––
107
 
@@ -110,42 +98,38 @@ CORS(app)
110
 
111
  # ––––– In-memory product cache –––––
112
 
113
- PRODUCT_CACHE_TTL_SEC = 60 * 10 # 10 minutes
114
  _product_cache: Dict[str, Any] = {
115
  "ts": 0,
116
  "df_offers": pd.DataFrame(),
117
  "raw_count": 0,
118
  }
119
 
120
- # ––––– Static Data (New Feature) –––––
121
 
122
  ZIM_ESSENTIALS = {
123
- "fuel_petrol": "$1.58/L (Blend)",
124
- "fuel_diesel": "$1.65/L (Diesel 50)",
125
- "zesa_electricity": "Tiered: First 50 units cheap, then ~14c/kWh",
126
- "bread_standard": "$1.00/loaf (Fixed)",
127
- "gas_lpg": "$1.90 - $2.10 per kg"
 
 
128
  }
129
 
130
  # =========================
131
- # Helpers: time / strings
132
  # =========================
133
 
134
  def now_utc_iso() -> str:
135
  return datetime.now(timezone.utc).isoformat()
136
 
137
- def _coerce_float(v: Any) -> Optional[float]:
138
  try:
139
- if v is None:
140
- return None
141
- if isinstance(v, (int, float)):
142
- return float(v)
143
- s = str(v).strip()
144
- if not s:
145
- return None
146
- return float(s)
147
  except Exception:
148
- return None
149
 
150
  def _norm_str(s: Any) -> str:
151
  s = "" if s is None else str(s)
@@ -155,12 +139,17 @@ def _norm_str(s: Any) -> str:
155
 
156
  def _safe_json_loads(s: str, fallback: Any):
157
  try:
 
 
 
 
 
158
  return json.loads(s)
159
  except Exception:
160
  return fallback
161
 
162
  # =========================
163
- # Firestore profile storage
164
  # =========================
165
 
166
  def profile_ref(profile_id: str):
@@ -168,14 +157,13 @@ def profile_ref(profile_id: str):
168
  return db.collection("pricelyst_profiles").document(profile_id)
169
 
170
  def get_profile(profile_id: str) -> Dict[str, Any]:
171
- if not db:
172
- return {}
173
  try:
174
  ref = profile_ref(profile_id)
175
  doc = ref.get()
176
  if doc.exists:
177
  return doc.to_dict() or {}
178
- # create default
179
  data = {
180
  "profile_id": profile_id,
181
  "created_at": now_utc_iso(),
@@ -183,956 +171,472 @@ def get_profile(profile_id: str) -> Dict[str, Any]:
183
  "username": None,
184
  "memory_summary": "",
185
  "preferences": {},
186
- "last_actions": [],
187
- "counters": {
188
- "chats": 0,
189
- "calls": 0,
190
- }
191
  }
192
  ref.set(data)
193
  return data
194
  except Exception as e:
195
- logger.error("get_profile error for %s: %s", profile_id, e)
196
  return {}
197
 
198
  def update_profile(profile_id: str, patch: Dict[str, Any]) -> None:
199
  if not db: return
200
  try:
201
- patch = dict(patch or {})
202
  patch["updated_at"] = now_utc_iso()
203
  profile_ref(profile_id).set(patch, merge=True)
204
  except Exception as e:
205
- logger.error("update_profile error: %s", e)
206
 
207
  def log_chat(profile_id: str, payload: Dict[str, Any]) -> None:
208
- if not db:
209
- logger.warning("DB not connected, skipping log_chat")
210
- return
211
  try:
212
- logger.info("Logging chat for %s. Type: %s", profile_id, payload.get("response_type"))
213
  db.collection("pricelyst_profiles").document(profile_id).collection("chat_logs").add({
214
  **payload,
215
  "ts": now_utc_iso()
216
  })
217
  except Exception as e:
218
- logger.error("Failed to log chat: %s", e)
219
 
220
  def log_call(profile_id: str, payload: Dict[str, Any]) -> str:
221
- if not db:
222
- logger.warning("DB not connected, skipping log_call")
223
- return ""
224
  try:
225
- logger.info("Logging call for %s. Transcript len: %s", profile_id, len(payload.get("transcript", "")))
226
- doc_ref = db.collection("pricelyst_profiles").document(profile_id).collection("call_logs").document()
227
- doc_ref.set({
228
  **payload,
229
  "ts": now_utc_iso()
230
  })
231
- logger.info("Call logged successfully. ID: %s", doc_ref.id)
232
- return doc_ref.id
233
  except Exception as e:
234
- logger.error("Failed to log call: %s", e)
235
  return ""
236
 
237
  # =========================
238
- # Multimodal image handling
239
  # =========================
240
 
241
- def parse_images(images: List[str]) -> List[Dict[str, Any]]:
242
- """
243
- Accepts:
244
- - data URLs: data:image/png;base64,....
245
- - raw base64 strings
246
- - http(s) URLs
247
- Returns: list of { "mime": "...", "bytes": b"..." } or { "url": "..." }
248
- """
249
- out = []
250
- for item in images or []:
251
- if not item:
252
- continue
253
- item = item.strip()
254
-
255
- # URL
256
- if item.startswith("http://") or item.startswith("https://"):
257
- out.append({"url": item})
258
- continue
259
-
260
- # data URL
261
- m = re.match(r"^data:(image\/[a-zA-Z0-9.+-]+);base64,(.+)$", item)
262
- if m:
263
- mime = m.group(1)
264
- b64 = m.group(2)
265
- try:
266
- out.append({"mime": mime, "bytes": base64.b64decode(b64)})
267
- except Exception:
268
- continue
269
- continue
270
-
271
- # raw base64
272
  try:
273
- out.append({"mime": "image/png", "bytes": base64.b64decode(item)})
274
- except Exception:
275
- continue
 
 
 
276
 
277
- return out
278
-
279
- # =========================
280
- # Product fetching + offers DF
281
- # =========================
282
-
283
- def fetch_products_page(page: int, per_page: int = 50) -> Dict[str, Any]:
284
- url = f"{PRICE_API_BASE}/api/v1/products"
285
- params = {"page": page, "perPage": per_page}
286
- r = requests.get(url, params=params, timeout=HTTP_TIMEOUT)
287
- r.raise_for_status()
288
- return r.json()
289
-
290
- def fetch_products(max_pages: int = 6, per_page: int = 50) -> List[Dict[str, Any]]:
291
- products: List[Dict[str, Any]] = []
292
- for p in range(1, max_pages + 1):
293
- payload = fetch_products_page(p, per_page=per_page)
294
- data = payload.get("data") or []
295
- if isinstance(data, list):
296
- products.extend(data)
297
- total_pages = payload.get("totalPages")
298
- if isinstance(total_pages, int) and p >= total_pages:
299
- break
300
- if not data:
301
  break
302
- return products
303
 
304
- def products_to_offers_df(products: List[Dict[str, Any]]) -> pd.DataFrame:
 
 
 
 
305
  rows = []
306
- for p in products or []:
307
  try:
308
- product_id = p.get("id")
309
- name = p.get("name") or ""
310
- clean_name = _norm_str(name)
311
-
312
- brand_name = ((p.get("brand") or {}).get("brand_name")) if isinstance(p.get("brand"), dict) else None
313
- categories = p.get("categories") or []
314
- cat_names = []
315
- for c in categories:
316
- if isinstance(c, dict) and c.get("name"):
317
- cat_names.append(c.get("name"))
318
- primary_category = cat_names[0] if cat_names else None
319
 
320
- stock_status = p.get("stock_status")
321
- on_promo = bool(p.get("on_promotion"))
322
- promo_badge = p.get("promo_badge")
323
- promo_name = p.get("promo_name")
324
- promo_price = _coerce_float(p.get("promo_price"))
325
- original_price = _coerce_float(p.get("original_price"))
326
-
327
- recommended_price = _coerce_float(p.get("recommended_price"))
328
- base_price = _coerce_float(p.get("price"))
329
- bulk_price = _coerce_float(p.get("bulk_price"))
330
- bulk_unit = p.get("bulk_unit")
331
-
332
- image = p.get("image")
333
- thumb = p.get("thumbnail")
334
 
335
- offers = p.get("prices") or []
336
- if not offers:
337
- rows.append({
338
- "product_id": product_id,
339
- "product_name": name,
340
- "clean_name": clean_name,
341
- "brand_name": brand_name,
342
- "primary_category": primary_category,
343
- "categories": cat_names,
344
- "stock_status": stock_status,
345
- "on_promotion": on_promo,
346
- "promo_badge": promo_badge,
347
- "promo_name": promo_name,
348
- "promo_price": promo_price,
349
- "original_price": original_price,
350
- "recommended_price": recommended_price,
351
- "base_price": base_price,
352
- "bulk_price": bulk_price,
353
- "bulk_unit": bulk_unit,
354
- "image": image,
355
- "thumbnail": thumb,
356
- "retailer_id": None,
357
- "retailer_name": None,
358
- "retailer_type": None,
359
- "retailer_logo": None,
360
- "offer_price": None,
361
- })
362
  continue
363
 
364
- for offer in offers:
365
- if not isinstance(offer, dict):
366
- continue
367
- retailer = offer.get("retailer") or {}
368
- rows.append({
369
- "product_id": product_id,
370
- "product_name": name,
371
- "clean_name": clean_name,
372
- "brand_name": brand_name,
373
- "primary_category": primary_category,
374
- "categories": cat_names,
375
- "stock_status": stock_status,
376
- "on_promotion": on_promo,
377
- "promo_badge": promo_badge,
378
- "promo_name": promo_name,
379
- "promo_price": promo_price,
380
- "original_price": original_price,
381
- "recommended_price": recommended_price,
382
- "base_price": base_price,
383
- "bulk_price": bulk_price,
384
- "bulk_unit": bulk_unit,
385
- "image": image,
386
- "thumbnail": thumb,
387
- "retailer_id": offer.get("retailer_id") or retailer.get("id"),
388
- "retailer_name": (retailer.get("name") if isinstance(retailer, dict) else None),
389
- "retailer_type": (retailer.get("type") if isinstance(retailer, dict) else None),
390
- "retailer_logo": (retailer.get("logo") if isinstance(retailer, dict) else None),
391
- "offer_price": _coerce_float(offer.get("price")),
392
- })
393
- except Exception:
394
  continue
395
 
396
  df = pd.DataFrame(rows)
397
- if df.empty:
398
- return df
399
-
400
- df["offer_price"] = df["offer_price"].apply(_coerce_float)
401
- df["clean_name"] = df["clean_name"].fillna("").astype(str)
402
- df["product_name"] = df["product_name"].fillna("").astype(str)
403
- df["retailer_name"] = df["retailer_name"].fillna("").astype(str)
404
  return df
405
 
406
- def get_offers_df(force_refresh: bool = False) -> pd.DataFrame:
407
- ts = _product_cache["ts"]
408
- if (not force_refresh) and (time.time() - ts < PRODUCT_CACHE_TTL_SEC) and isinstance(_product_cache["df_offers"], pd.DataFrame) and not _product_cache["df_offers"].empty:
409
- return _product_cache["df_offers"]
410
-
411
- try:
412
- products = fetch_products(max_pages=8, per_page=50)
413
- df = products_to_offers_df(products)
414
- _product_cache["ts"] = time.time()
415
- _product_cache["df_offers"] = df
416
- _product_cache["raw_count"] = len(products)
417
- logger.info("Loaded offers DF: products=%s offers_rows=%s", len(products), len(df))
418
- return df
419
- except Exception as e:
420
- logger.error("Failed to refresh product cache: %s", e)
421
- # fallback: return old cache (even if stale)
422
- if isinstance(_product_cache["df_offers"], pd.DataFrame):
423
- return _product_cache["df_offers"]
424
- return pd.DataFrame()
 
425
 
426
  # =========================
427
- # Gemini wrappers
428
  # =========================
429
 
430
- def gemini_generate_text(system: str, user: str, temperature: float = 0.4) -> str:
431
- if not _gemini_client:
432
- return ""
433
- try:
434
- resp = _gemini_client.models.generate_content(
435
- model=GEMINI_MODEL,
436
- contents=[
437
- {"role": "user", "parts": [{"text": system.strip() + "\n\n" + user.strip()}]}
438
- ],
439
- config={
440
- "temperature": temperature,
441
- "max_output_tokens": 900,
442
- }
443
- )
444
- return (resp.text or "").strip()
445
- except Exception as e:
446
- logger.error("Gemini text error: %s", e)
447
- return ""
448
-
449
- def gemini_generate_json(system: str, user: str, images: List = None) -> Dict[str, Any]:
450
- """NEW: Strict JSON generation for reliable Plan/Intent"""
451
- if not _gemini_client: return {}
452
- parts = [{"text": system + "\n\n" + user}]
453
- for img in images or []:
454
- if "bytes" in img:
455
- b64 = base64.b64encode(img["bytes"]).decode("utf-8")
456
- parts.append({"inline_data": {"mime_type": img["mime"], "data": b64}})
457
- elif "url" in img:
458
- parts.append({"text": f"Image URL: {img['url']}"})
459
- try:
460
- resp = _gemini_client.models.generate_content(
461
- model=GEMINI_MODEL,
462
- contents=[{"role": "user", "parts": parts}],
463
- config={"temperature": 0.2, "response_mime_type": "application/json", "max_output_tokens": 2000}
464
- )
465
- return json.loads(resp.text)
466
- except Exception as e:
467
- logger.error("Gemini JSON error: %s", e)
468
- return {}
469
-
470
- def gemini_generate_multimodal(system: str, user: str, images: List[Dict[str, Any]]) -> str:
471
  """
472
- Uses Gemini multimodal:
473
- - if we have bytes -> inline_data
474
- - if we have url -> just paste the URL (server-side fetch is unreliable w/o whitelisting),
475
- so we prefer bytes from the client.
476
  """
477
- if not _gemini_client:
478
- return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
479
 
480
- parts: List[Dict[str, Any]] = [{"text": system.strip() + "\n\n" + user.strip()}]
 
 
 
 
 
 
 
 
481
 
482
- for img in images or []:
483
- if "bytes" in img and img.get("mime"):
484
- b64 = base64.b64encode(img["bytes"]).decode("utf-8")
485
- parts.append({
486
- "inline_data": {
487
- "mime_type": img["mime"],
488
- "data": b64
489
- }
490
- })
491
- elif img.get("url"):
492
- # last resort
493
- parts.append({"text": f"[IMAGE_URL]\n{img['url']}"})
 
 
 
 
494
 
 
 
 
 
 
 
495
  try:
496
- resp = _gemini_client.models.generate_content(
497
  model=GEMINI_MODEL,
498
- contents=[{"role": "user", "parts": parts}],
499
- config={
500
- "temperature": 0.2,
501
- "max_output_tokens": 900,
502
- }
 
 
 
 
503
  )
504
- return (resp.text or "").strip()
505
  except Exception as e:
506
- logger.error("Gemini multimodal error: %s", e)
507
- return ""
508
 
509
  # =========================
510
- # Intent + actionability
511
  # =========================
512
 
513
- INTENT_SYSTEM = """
514
- You are Pricelyst AI. Your job: understand whether the user is asking for actionable shopping help.
515
- Return STRICT JSON only.
516
-
517
- Output schema:
518
- {
519
- "actionable": true|false,
520
- "intent": one of [
521
- "store_recommendation",
522
- "price_lookup",
523
- "price_compare",
524
- "basket_optimize",
525
- "basket_build",
526
- "product_discovery",
527
- "trust_check",
528
- "chit_chat",
529
- "lifestyle_lookup",
530
- "other"
531
- ],
532
- "items": [{"name": "...", "quantity": 1}],
533
- "constraints": {"budget": number|null, "location": "... "|null, "time_context": "mid-month|month-end|weekend|today|unknown"},
534
- "notes": "short reasoning"
535
- }
536
-
537
- Rules:
538
- - If user is chatting/social (hi, jokes, thanks, how are you, etc) => actionable=false, intent="chit_chat".
539
- - If user asks about prices/stores/basket/what to buy => actionable=true.
540
- - If user provided a list, extract items + quantities if obvious.
541
- - "How much is fuel/zesa/bread" -> lifestyle_lookup
542
- - Keep it conservative: if unclear, actionable=false.
543
  """
544
 
545
- def detect_intent(message: str, images_present: bool, context: Dict[str, Any]) -> Dict[str, Any]:
546
- # 1. Fast path for ZIM_ESSENTIALS (Optimization)
547
- msg_lower = message.lower()
548
- for k in ZIM_ESSENTIALS:
549
- clean_k = k.split('_')[-1] # fuel_petrol -> petrol
550
- if clean_k in msg_lower and "price" in msg_lower:
551
- return {"actionable": True, "intent": "lifestyle_lookup", "items": [{"name": k}]}
552
-
553
- # 2. Gemini Detection
554
- ctx_str = json.dumps(context or {}, ensure_ascii=False)
555
- user = f"Message: {message}\nImagesPresent: {images_present}\nContext: {ctx_str}"
556
-
557
- # Try using the strict JSON helper first for better reliability
558
- try:
559
- data = gemini_generate_json(INTENT_SYSTEM, user)
560
- if not isinstance(data, dict): raise ValueError("Invalid JSON")
561
- except:
562
- # Fallback to text parsing if JSON mode fails (Backward Compat)
563
- out = gemini_generate_text(INTENT_SYSTEM, user, temperature=0.1)
564
- data = _safe_json_loads(out, fallback={})
565
-
566
- if not isinstance(data, dict):
567
- return {"actionable": False, "intent": "other", "items": [], "constraints": {}, "notes": "bad_json"}
568
- # normalize
569
- data.setdefault("actionable", False)
570
- data.setdefault("intent", "other")
571
- data.setdefault("items", [])
572
- data.setdefault("constraints", {})
573
- return data
574
-
575
- # =========================
576
- # Shopping Plan Generator (NEW)
577
- # =========================
578
 
579
- PLAN_SYSTEM_PROMPT = """
580
- You are Jessica, the Pricelyst Shopping Advisor. Analyze the conversation transcript.
581
- If the user discussed a shopping list, budget plan, or event needs, create a structured plan.
 
582
 
583
- OUTPUT JSON SCHEMA:
584
  {
585
- "is_actionable": boolean,
586
- "title": "Short title (e.g. 'Weekend Braai List')",
587
- "summary": "1 sentence summary",
588
- "items": [{"name": "string", "qty": "string", "est_price": number|null}],
589
- "markdown_content": "A clean Markdown report for a PDF. Include headers (#), bullet points, and a budget summary table if applicable. Keep it professional."
590
  }
591
-
592
- If no shopping/planning occurred, set is_actionable=false.
593
  """
594
 
595
- def generate_shopping_plan(transcript: str) -> Dict[str, Any]:
596
- if not transcript or len(transcript) < 30:
597
- return {"is_actionable": False}
598
- return gemini_generate_json(PLAN_SYSTEM_PROMPT, f"TRANSCRIPT:\n{transcript}")
599
-
600
- # =========================
601
- # Matching + analytics
602
- # =========================
603
-
604
- def search_products(df: pd.DataFrame, query: str, limit: int = 10) -> pd.DataFrame:
605
  """
606
- Simple search: contains on clean_name + fallback token overlap scoring.
 
 
 
607
  """
608
- if df.empty:
609
- return df
610
-
611
- q = _norm_str(query)
612
- if not q:
613
- return df.head(0)
614
-
615
- # direct contains
616
- hit = df[df["clean_name"].str.contains(re.escape(q), na=False)]
617
- if len(hit) >= limit:
618
- return hit.head(limit)
619
-
620
- # token overlap (cheap scoring)
621
- q_tokens = set(q.split())
622
- if not q_tokens:
623
- return hit.head(limit)
624
-
625
- tmp = df.copy()
626
- tmp["score"] = tmp["clean_name"].apply(lambda s: len(q_tokens.intersection(set(str(s).split()))))
627
- tmp = tmp[tmp["score"] > 0].sort_values(["score"], ascending=False)
628
- combined = pd.concat([hit, tmp], axis=0).drop_duplicates(subset=["product_id", "retailer_id"])
629
- return combined.head(limit)
630
-
631
- def summarize_offers(df_hits: pd.DataFrame) -> Dict[str, Any]:
632
- """
633
- For one product name, there can be multiple retailers (offers).
634
- We return:
635
- - cheapest offer
636
- - price range
637
- - top offers
638
- """
639
- if df_hits.empty:
640
- return {}
641
-
642
- # group by product_id (best is highest offer coverage)
643
- grp = df_hits.groupby("product_id").size().sort_values(ascending=False)
644
- best_pid = int(grp.index[0])
645
- prod_rows = df_hits[df_hits["product_id"] == best_pid].copy()
646
-
647
- prod_name = prod_rows["product_name"].iloc[0]
648
- brand = prod_rows["brand_name"].iloc[0]
649
- category = prod_rows["primary_category"].iloc[0]
650
- stock = prod_rows["stock_status"].iloc[0]
651
- on_promo = bool(prod_rows["on_promotion"].iloc[0])
652
- promo_badge = prod_rows["promo_badge"].iloc[0]
653
- image = prod_rows["thumbnail"].iloc[0] or prod_rows["image"].iloc[0]
654
-
655
- offers = prod_rows[prod_rows["offer_price"].notna()].copy()
656
- offers = offers.sort_values("offer_price", ascending=True)
657
-
658
- if offers.empty:
659
- return {
660
- "product_id": best_pid,
661
- "name": prod_name,
662
- "brand": brand,
663
- "category": category,
664
- "stock_status": stock,
665
- "on_promotion": on_promo,
666
- "promo_badge": promo_badge,
667
- "image": image,
668
- "offers": [],
669
- "cheapest": None,
670
- "price_range": None,
671
- }
672
 
673
- cheapest = {
674
- "retailer": offers.iloc[0]["retailer_name"],
675
- "price": float(offers.iloc[0]["offer_price"] or 0),
676
- "retailer_logo": offers.iloc[0]["retailer_logo"],
677
- }
678
- lo = float(offers["offer_price"].min())
679
- hi = float(offers["offer_price"].max())
680
 
681
- top_offers = []
682
- for _, r in offers.head(5).iterrows():
683
- top_offers.append({
684
- "retailer": r["retailer_name"],
685
- "price": float(r["offer_price"]),
686
- "retailer_logo": r["retailer_logo"],
687
- })
688
-
689
- return {
690
- "product_id": best_pid,
691
- "name": prod_name,
692
- "brand": brand,
693
- "category": category,
694
- "stock_status": stock,
695
- "on_promotion": on_promo,
696
- "promo_badge": promo_badge,
697
- "image": image,
698
- "offers": top_offers,
699
- "cheapest": cheapest,
700
- "price_range": {"min": lo, "max": hi, "spread": (hi - lo)},
701
- }
702
 
703
- def basket_store_choice(df: pd.DataFrame, items: List[Dict[str, Any]]) -> Dict[str, Any]:
704
- """
705
- Given items, pick:
706
- - best single store to cover most items and minimize total
707
- Very pragmatic MVP: for each item, match the best product and take cheapest offer.
708
- """
709
- if df.empty or not items:
710
- return {"items": [], "best_store": None, "missing": []}
711
 
712
- results = []
713
- missing = []
 
714
 
715
- for it in items:
716
- name = it.get("name") or ""
717
- qty = int(it.get("quantity") or 1)
718
- hits = search_products(df, name, limit=50)
719
- summary = summarize_offers(hits)
720
- if not summary or not summary.get("cheapest"):
721
- missing.append(name)
722
- continue
723
- cheapest = summary["cheapest"]
724
- results.append({
725
- "requested": name,
726
- "matched_product": summary["name"],
727
- "brand": summary.get("brand"),
728
- "qty": qty,
729
- "cheapest_retailer": cheapest["retailer"],
730
- "unit_price": cheapest["price"],
731
- "line_total": cheapest["price"] * qty,
732
- "offers": summary.get("offers", []),
733
- "image": summary.get("image"),
734
- })
735
-
736
- if not results:
737
- return {"items": [], "best_store": None, "missing": missing}
738
 
739
- # compute totals by retailer for "all cheapest per item"
740
- retailer_totals: Dict[str, float] = {}
741
- retailer_counts: Dict[str, int] = {}
742
- for r in results:
743
- k = r["cheapest_retailer"]
744
- retailer_totals[k] = retailer_totals.get(k, 0.0) + float(r["line_total"])
745
- retailer_counts[k] = retailer_counts.get(k, 0) + 1
746
 
747
- # Score: cover_count desc, then total asc
748
- best = sorted(retailer_totals.keys(), key=lambda k: (-retailer_counts.get(k, 0), retailer_totals.get(k, 0.0)))[0]
749
- return {
750
- "items": results,
751
- "best_store": {
752
- "name": best,
753
- "covered_items": retailer_counts.get(best, 0),
754
- "total_for_covered_items": round(retailer_totals.get(best, 0.0), 2),
755
- "total_items_requested": len(items),
756
- },
757
- "missing": missing
758
- }
759
-
760
- # =========================
761
- # Response rendering (informative)
762
- # =========================
763
-
764
- def render_price_answer(summary: Dict[str, Any]) -> Dict[str, Any]:
765
- """
766
- Returns structured payload for frontend to render nicely.
767
- """
768
- if not summary:
769
- return {
770
- "type": "not_found",
771
- "title": "I couldn't find that product.",
772
- "message": "Try a different wording (brand + size helps), or upload an image/receipt.",
773
- }
774
 
775
- name = summary.get("name")
776
- brand = summary.get("brand")
777
- category = summary.get("category")
778
- stock = summary.get("stock_status")
779
- on_promo = summary.get("on_promotion")
780
- promo_badge = summary.get("promo_badge")
781
- image = summary.get("image")
782
- cheapest = summary.get("cheapest")
783
- pr = summary.get("price_range")
784
-
785
- lines = []
786
- if cheapest:
787
- lines.append(f"Cheapest right now: {cheapest['retailer']} — ${cheapest['price']:.2f}")
788
- if pr and pr.get("min") is not None and pr.get("max") is not None and pr["max"] != pr["min"]:
789
- lines.append(f"Price range: ${pr['min']:.2f} → ${pr['max']:.2f} (spread ${pr['spread']:.2f})")
790
- if on_promo:
791
- lines.append(f"Promo: {promo_badge or 'On promotion'}")
792
 
793
- return {
794
- "type": "product_price",
795
- "title": name,
796
- "subtitle": " | ".join([x for x in [brand, category, stock] if x]),
797
- "image": image,
798
- "highlights": lines,
799
- "offers": summary.get("offers", []),
800
- "raw": summary,
801
- }
802
-
803
- def render_basket_answer(basket: Dict[str, Any]) -> Dict[str, Any]:
804
- if not basket.get("items"):
805
- return {
806
- "type": "basket_empty",
807
- "title": "I couldn't build a basket from that.",
808
- "message": "Send a clearer list (e.g., '2 bread, 1 cooking oil 2L') or upload a list/receipt photo."
809
- }
810
-
811
- best = basket.get("best_store")
812
- missing = basket.get("missing") or []
813
- return {
814
- "type": "basket_plan",
815
- "title": "Basket plan",
816
- "best_store": best,
817
- "items": basket["items"],
818
- "missing": missing,
819
- "notes": "If you want, tell me your budget and I'll suggest cheaper substitutes.",
820
- }
821
-
822
- # =========================
823
- # Multimodal extraction (lists / receipts)
824
- # =========================
825
-
826
- VISION_SYSTEM = """
827
- You are an expert shopping assistant. Extract actionable items and quantities from the user's image(s).
828
- Return STRICT JSON only.
829
-
830
- Output schema:
831
- {
832
- "actionable": true|false,
833
- "items": [{"name":"...", "quantity": 1}],
834
- "notes": "short"
835
- }
836
 
837
- Rules:
838
- - If it looks like a handwritten shopping list, extract items.
839
- - If it looks like a receipt, extract the purchased items (best-effort).
840
- - If it's random (selfie, meme, etc), actionable=false and items=[].
841
- - Keep it conservative: only include items you're confident about.
842
- """
843
 
844
- def extract_items_from_images(images: List[Dict[str, Any]]) -> Dict[str, Any]:
845
- if not images:
846
- return {"actionable": False, "items": [], "notes": "no_images"}
847
- user = "Extract items from the images."
848
- out = gemini_generate_multimodal(VISION_SYSTEM, user, images)
849
- data = _safe_json_loads(out, fallback={})
850
- if not isinstance(data, dict):
851
- return {"actionable": False, "items": [], "notes": "bad_json"}
852
- data.setdefault("actionable", False)
853
- data.setdefault("items", [])
854
- return data
855
 
856
  # =========================
857
- # Routes
858
  # =========================
859
 
860
  @app.get("/health")
861
  def health():
 
862
  return jsonify({
863
  "ok": True,
864
  "ts": now_utc_iso(),
865
- "gemini": bool(_gemini_client),
866
- "firestore": bool(db),
867
- "products_cached_rows": int(len(_product_cache["df_offers"])) if isinstance(_product_cache["df_offers"], pd.DataFrame) else 0,
868
- "products_raw_count": int(_product_cache.get("raw_count", 0)),
869
  })
870
 
871
  @app.post("/chat")
872
- def chat():
 
873
  body = request.get_json(silent=True) or {}
874
- profile_id = (body.get("profile_id") or "").strip()
 
 
875
  if not profile_id:
876
- return jsonify({"ok": False, "error": "profile_id is required"}), 400
877
-
878
- message = (body.get("message") or "").strip()
879
- username = body.get("username")
880
- context = body.get("context") or {}
881
- images_raw = body.get("images") or []
882
- images = parse_images(images_raw)
883
-
884
- prof = get_profile(profile_id)
885
- if username and not prof.get("username"):
886
- update_profile(profile_id, {"username": username})
887
-
888
- # 1) If images: try extract items (shopping list / receipt)
889
- extracted = {"actionable": False, "items": [], "notes": "skipped"}
890
- if images:
891
- extracted = extract_items_from_images(images)
892
-
893
- # 2) Detect intent from message (+ image presence)
894
- intent = detect_intent(message, images_present=bool(images), context=context)
895
-
896
- # If image extraction got items, treat as actionable unless the message is clearly chit-chat
897
- image_items = extracted.get("items") if isinstance(extracted, dict) else []
898
- if image_items and isinstance(image_items, list) and intent.get("intent") != "chit_chat":
899
- intent["actionable"] = True
900
- intent["intent"] = "basket_build" if len(image_items) > 1 else "price_lookup"
901
- intent["items"] = image_items
902
-
903
- # 3) Graceful conversational fallback
904
- if not intent.get("actionable"):
905
- reply = {
906
- "type": "chat",
907
- "message": (
908
- f"Hey{(' ' + (username or prof.get('username') or '')).strip()} 👋\n"
909
- "If you want shopping help, ask me something like:\n"
910
- "• “Where is cooking oil cheapest?”\n"
911
- "• “Which store is best for my basket: rice, chicken, oil?”\n"
912
- "• “Build me a budget basket under $20.”"
913
- )
914
- }
915
- # log + counters
916
- log_chat(profile_id, {"message": message, "intent": intent, "response_type": "chit_chat"})
917
- update_profile(profile_id, {"counters": {"chats": int((prof.get("counters") or {}).get("chats", 0)) + 1}})
918
- return jsonify({"ok": True, "intent": intent, "data": reply})
919
 
920
- # 4) Actionable: execute
921
- df = get_offers_df(force_refresh=False)
 
922
 
923
- response_payload: Dict[str, Any] = {"type": "unknown", "message": "No result."}
924
-
925
- # --- NEW: Check for Lifestyle/Essentials (Fuel/ZESA) ---
926
- if intent["intent"] == "lifestyle_lookup":
927
- # Items are auto-detected in detect_intent
928
- key = intent["items"][0]["name"]
929
- # Fuzzy match dict key
930
- val = ZIM_ESSENTIALS.get(key) or ZIM_ESSENTIALS.get("fuel_petrol") # fallback
931
- response_payload = {
932
- "type": "info_card",
933
- "title": f"Market Rate: {key.replace('_', ' ').title()}",
934
- "message": str(val),
935
- "highlights": [f"Current: {val}"]
936
- }
937
-
938
- # --- Original Logic ---
939
- elif intent["intent"] in ("price_lookup", "trust_check", "product_discovery"):
940
- # pick first item or treat message as query
941
- query = ""
942
- if intent.get("items"):
943
- query = intent["items"][0].get("name") or ""
944
- if not query:
945
- query = message
946
- hits = search_products(df, query, limit=80)
947
- summary = summarize_offers(hits)
948
- response_payload = render_price_answer(summary)
949
-
950
- elif intent["intent"] in ("basket_build", "basket_optimize", "store_recommendation"):
951
- items = intent.get("items") or []
952
- # if user didn't provide items but asked store choice, we can try to extract nouns—too risky; keep conservative
953
- if not items:
954
- response_payload = {
955
- "type": "need_list",
956
- "title": "Send your list",
957
- "message": "I can recommend the best store once you send your basket (even 3–5 items)."
958
  }
959
  else:
960
- basket = basket_store_choice(df, items)
961
- response_payload = render_basket_answer(basket)
962
-
963
- elif intent["intent"] == "price_compare":
964
- items = intent.get("items") or []
965
- if len(items) < 2:
966
- response_payload = {
967
- "type": "need_two_items",
968
- "title": "Need two items",
969
- "message": "Tell me two items to compare, e.g., “Coke 2L vs Pepsi 2L”."
970
- }
971
- else:
972
- comparisons = []
973
- for it in items[:3]:
974
- hits = search_products(df, it.get("name") or "", limit=60)
975
- summary = summarize_offers(hits)
976
- comparisons.append(summary)
977
-
978
- # compute cheapest for each
979
- rows = []
980
- for s in comparisons:
981
- if not s or not s.get("cheapest"):
982
- continue
983
- rows.append({
984
- "name": s.get("name"),
985
- "cheapest_retailer": s["cheapest"]["retailer"],
986
- "price": s["cheapest"]["price"]
987
- })
988
- rows = sorted(rows, key=lambda x: x["price"])
989
- response_payload = {
990
- "type": "comparison",
991
- "title": "Comparison",
992
- "items": rows,
993
- "winner": rows[0] if rows else None
994
- }
995
 
996
- # 5) Persist + counters + light memory updates
997
- log_chat(profile_id, {
998
- "message": message,
999
- "intent": intent,
1000
- "response_type": response_payload.get("type"),
1001
- "images_present": bool(images),
1002
  })
1003
 
1004
- counters = prof.get("counters") or {}
1005
- update_profile(profile_id, {"counters": {"chats": int(counters.get("chats", 0)) + 1}})
1006
-
1007
- # minimal preference inference
1008
- if response_payload.get("type") == "basket_plan" and response_payload.get("best_store"):
1009
- update_profile(profile_id, {"preferences": {"last_best_store": response_payload["best_store"]["name"]}})
1010
-
1011
- return jsonify({"ok": True, "intent": intent, "data": response_payload})
1012
-
1013
  @app.post("/api/call-briefing")
1014
  def call_briefing():
 
 
 
 
1015
  body = request.get_json(silent=True) or {}
1016
- profile_id = (body.get("profile_id") or "").strip()
 
 
1017
  if not profile_id:
1018
- return jsonify({"ok": False, "error": "profile_id is required"}), 400
1019
 
1020
- username = body.get("username")
1021
  prof = get_profile(profile_id)
1022
-
1023
- if username and not prof.get("username"):
1024
  update_profile(profile_id, {"username": username})
1025
- prof["username"] = username
1026
 
1027
- # Build lightweight "shopping intelligence" variables for ElevenLabs agent
1028
- prefs = prof.get("preferences") or {}
1029
- last_store = (prefs.get("last_best_store") or "").strip() or None
1030
-
1031
- # quick stats from recent chats (last 25)
1032
- intent_counts: Dict[str, int] = {}
1033
- try:
1034
- logs = db.collection("pricelyst_profiles").document(profile_id).collection("chat_logs") \
1035
- .order_by("ts", direction=firestore.Query.DESCENDING).limit(25).stream()
1036
-
1037
- intents = []
1038
- for d in logs:
1039
- dd = d.to_dict() or {}
1040
- ii = (dd.get("intent") or {}).get("intent")
1041
- if ii:
1042
- intents.append(ii)
1043
-
1044
- for ii in intents:
1045
- intent_counts[ii] = intent_counts.get(ii, 0) + 1
1046
- except Exception as e:
1047
- logger.error("Error fetching call briefing chat history: %s", e)
1048
-
1049
- # --- KPI Snapshot Logic ---
1050
- # We construct a dictionary that the React client will pass as a JSON string
1051
- # We inject ZIM_ESSENTIALS here so the Agent has knowledge of fuel/zesa prices
1052
  kpi_data = {
1053
- "username": prof.get("username") or "there",
1054
- "last_best_store": last_store,
1055
- "top_intents_last_25": sorted(intent_counts.items(), key=lambda x: x[1], reverse=True)[:5],
1056
- "tone": "practical_zimbabwe",
1057
- "market_rates_essentials": ZIM_ESSENTIALS # <--- INJECTED KNOWLEDGE
1058
  }
1059
 
1060
  return jsonify({
1061
  "ok": True,
1062
- "profile_id": profile_id,
1063
  "memory_summary": prof.get("memory_summary", ""),
1064
- # This string is passed to ElevenLabs by the React Client
1065
- "kpi_snapshot": json.dumps(kpi_data)
1066
  })
1067
 
1068
  @app.post("/api/log-call-usage")
1069
  def log_call_usage():
 
 
 
 
 
 
1070
  body = request.get_json(silent=True) or {}
1071
- profile_id = (body.get("profile_id") or "").strip()
1072
- if not profile_id:
1073
- return jsonify({"ok": False, "error": "profile_id is required"}), 400
1074
-
1075
- transcript = (body.get("transcript") or "").strip()
1076
- call_id = body.get("call_id") or None
1077
- started_at = body.get("started_at") or None
1078
- ended_at = body.get("ended_at") or None
1079
- stats = body.get("stats") or {}
1080
 
1081
- logger.info("Received call usage for %s. Transcript len: %d", profile_id, len(transcript))
 
1082
 
1083
- prof = get_profile(profile_id)
1084
 
1085
- # --- UPGRADE: Use Shopping Plan Generator (JSON + Markdown) ---
1086
- plan_id = None
1087
- report_md = ""
1088
  plan_data = {}
1089
-
1090
- try:
1091
- if transcript:
1092
- logger.info("Generating shopping plan via Gemini...")
1093
- plan_data = generate_shopping_plan(transcript)
1094
- logger.info("Plan generated. actionable=%s, title=%s", plan_data.get("is_actionable"), plan_data.get("title"))
1095
-
1096
  if plan_data.get("is_actionable"):
1097
- # Save structured plan
1098
  plan_ref = db.collection("pricelyst_profiles").document(profile_id).collection("shopping_plans").document()
1099
  plan_data["id"] = plan_ref.id
1100
- plan_data["call_id"] = call_id
1101
  plan_data["created_at"] = now_utc_iso()
1102
  plan_ref.set(plan_data)
1103
  plan_id = plan_ref.id
1104
- report_md = plan_data.get("markdown_content", "")
1105
- logger.info("Shopping plan stored. ID=%s", plan_id)
1106
- else:
1107
- logger.info("No actionable shopping plan found in call.")
1108
- except Exception as e:
1109
- logger.error("Error generating/storing shopping plan: %s", e)
1110
-
1111
- # Log the call (link the plan_id)
1112
- doc_id = log_call(profile_id, {
1113
- "call_id": call_id,
1114
- "started_at": started_at,
1115
- "ended_at": ended_at,
1116
- "stats": stats,
1117
  "transcript": transcript,
1118
- "generated_plan_id": plan_id,
1119
- "report_markdown": report_md,
1120
  })
1121
-
1122
- # update counters
1123
- try:
1124
- counters = prof.get("counters") or {}
1125
- update_profile(profile_id, {"counters": {"calls": int(counters.get("calls", 0)) + 1}})
1126
- except Exception as e:
1127
- logger.error("Error updating profile counters: %s", e)
1128
 
1129
  return jsonify({
1130
  "ok": True,
1131
- "logged_call_doc_id": doc_id,
1132
- "shopping_plan": plan_data if plan_id else None # Frontend uses this for PDF
1133
  })
1134
 
1135
- # NEW: Shopping Plans CRUD —
1136
 
1137
  @app.get("/api/shopping-plans")
1138
  def list_plans():
@@ -1140,21 +644,9 @@ def list_plans():
1140
  if not pid: return jsonify({"ok": False}), 400
1141
  try:
1142
  docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \
1143
- .order_by("created_at", direction=firestore.Query.DESCENDING).limit(20).stream()
1144
  plans = [{"id": d.id, **d.to_dict()} for d in docs]
1145
  return jsonify({"ok": True, "plans": plans})
1146
- except Exception as e:
1147
- logger.error("list_plans error: %s", e)
1148
- return jsonify({"ok": False, "error": str(e)}), 500
1149
-
1150
- @app.get("/api/shopping-plans/<plan_id>")
1151
- def get_plan(plan_id):
1152
- pid = request.args.get("profile_id")
1153
- if not pid: return jsonify({"ok": False}), 400
1154
- try:
1155
- doc = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document(plan_id).get()
1156
- if not doc.exists: return jsonify({"ok": False, "error": "Not found"}), 404
1157
- return jsonify({"ok": True, "plan": doc.to_dict()})
1158
  except Exception as e:
1159
  return jsonify({"ok": False, "error": str(e)}), 500
1160
 
@@ -1169,9 +661,14 @@ def delete_plan(plan_id):
1169
  return jsonify({"ok": False, "error": str(e)}), 500
1170
 
1171
  # =========================
1172
- # Run
1173
  # =========================
1174
 
1175
  if __name__ == "__main__":
1176
- port = int(os.environ.get("PORT", "7860"))
1177
- app.run(host="0.0.0.0", port=port, debug=True)
 
 
 
 
 
 
1
  """
2
+ main.py — Pricelyst Shopping Advisor (Jessica Edition - Grounded Data Version)
3
 
4
  ✅ Flask API
5
+ ✅ Firebase Admin persistence
6
+ ✅ Gemini via google-genai SDK
7
+ RAG (Retrieval Augmented Generation) for Shopping Plans
8
+ Real Pricing Logic (No Hallucinations)
9
+ Backwards Compatible with React Client
10
+
11
+ ENV VARS:
 
12
  - GOOGLE_API_KEY=...
13
+ - FIREBASE='{"type":"service_account", ...}'
14
+ - PRICE_API_BASE=https://api.pricelyst.co.zw
15
+ - GEMINI_MODEL=gemini-2.0-flash
16
+ - PORT=5000
17
  """
18
 
19
  import os
20
  import re
21
  import json
22
  import time
 
 
23
  import base64
24
  import logging
25
  from datetime import datetime, timezone
26
+ from typing import Any, Dict, List, Optional
27
 
28
  import requests
29
  import pandas as pd
 
40
 
41
  # ––––– Gemini (NEW SDK) –––––
42
 
 
 
43
  try:
44
  from google import genai
45
  from google.genai import types
 
60
 
61
  # ––––– Firebase Admin –––––
62
 
 
 
63
  import firebase_admin
64
  from firebase_admin import credentials, firestore
65
 
66
  FIREBASE_ENV = os.environ.get("FIREBASE", "")
67
 
68
  def init_firestore_from_env() -> firestore.Client:
 
69
  if firebase_admin._apps:
70
  return firestore.client()
71
 
 
72
  if not FIREBASE_ENV:
73
+ # Fallback for local dev if needed, or raise error
74
+ logger.warning("FIREBASE env var missing. Persistence disabled.")
75
+ return None
76
 
77
  try:
78
  sa_info = json.loads(FIREBASE_ENV)
 
82
  return firestore.client()
83
  except Exception as e:
84
  logger.critical("Failed to initialize Firebase: %s", e)
85
+ return None
86
 
87
+ db = init_firestore_from_env()
 
 
 
 
88
 
89
  # ––––– External API (Pricelyst) –––––
90
 
91
  PRICE_API_BASE = os.environ.get("PRICE_API_BASE", "https://api.pricelyst.co.zw").rstrip("/")
92
+ HTTP_TIMEOUT = 25
93
 
94
  # ––––– Flask –––––
95
 
 
98
 
99
  # ––––– In-memory product cache –––––
100
 
101
+ PRODUCT_CACHE_TTL_SEC = 60 * 15 # 15 minutes
102
  _product_cache: Dict[str, Any] = {
103
  "ts": 0,
104
  "df_offers": pd.DataFrame(),
105
  "raw_count": 0,
106
  }
107
 
108
+ # ––––– Static Data (Fallbacks) –––––
109
 
110
  ZIM_ESSENTIALS = {
111
+ "fuel": {"price": 1.58, "unit": "L", "retailer": "Pump Price"},
112
+ "petrol": {"price": 1.58, "unit": "L", "retailer": "Pump Price"},
113
+ "diesel": {"price": 1.65, "unit": "L", "retailer": "Pump Price"},
114
+ "bread": {"price": 1.00, "unit": "loaf", "retailer": "Standard"},
115
+ "gas": {"price": 2.00, "unit": "kg", "retailer": "LPG Market"},
116
+ "electricity": {"price": 20.00, "unit": "est. month", "retailer": "ZESA"},
117
+ "zesa": {"price": 20.00, "unit": "est. month", "retailer": "ZESA"},
118
  }
119
 
120
  # =========================
121
+ # Helpers
122
  # =========================
123
 
124
  def now_utc_iso() -> str:
125
  return datetime.now(timezone.utc).isoformat()
126
 
127
+ def _coerce_float(v: Any) -> float:
128
  try:
129
+ if v is None: return 0.0
130
+ return float(v)
 
 
 
 
 
 
131
  except Exception:
132
+ return 0.0
133
 
134
  def _norm_str(s: Any) -> str:
135
  s = "" if s is None else str(s)
 
139
 
140
  def _safe_json_loads(s: str, fallback: Any):
141
  try:
142
+ # Strip markdown code blocks if present
143
+ if "```json" in s:
144
+ s = s.split("```json")[1].split("```")[0]
145
+ elif "```" in s:
146
+ s = s.split("```")[0]
147
  return json.loads(s)
148
  except Exception:
149
  return fallback
150
 
151
  # =========================
152
+ # Firestore
153
  # =========================
154
 
155
  def profile_ref(profile_id: str):
 
157
  return db.collection("pricelyst_profiles").document(profile_id)
158
 
159
  def get_profile(profile_id: str) -> Dict[str, Any]:
160
+ if not db: return {}
 
161
  try:
162
  ref = profile_ref(profile_id)
163
  doc = ref.get()
164
  if doc.exists:
165
  return doc.to_dict() or {}
166
+
167
  data = {
168
  "profile_id": profile_id,
169
  "created_at": now_utc_iso(),
 
171
  "username": None,
172
  "memory_summary": "",
173
  "preferences": {},
174
+ "counters": {"chats": 0, "calls": 0}
 
 
 
 
175
  }
176
  ref.set(data)
177
  return data
178
  except Exception as e:
179
+ logger.error("DB Error get_profile: %s", e)
180
  return {}
181
 
182
  def update_profile(profile_id: str, patch: Dict[str, Any]) -> None:
183
  if not db: return
184
  try:
185
+ patch = dict(patch)
186
  patch["updated_at"] = now_utc_iso()
187
  profile_ref(profile_id).set(patch, merge=True)
188
  except Exception as e:
189
+ logger.error("DB Error update_profile: %s", e)
190
 
191
  def log_chat(profile_id: str, payload: Dict[str, Any]) -> None:
192
+ if not db: return
 
 
193
  try:
 
194
  db.collection("pricelyst_profiles").document(profile_id).collection("chat_logs").add({
195
  **payload,
196
  "ts": now_utc_iso()
197
  })
198
  except Exception as e:
199
+ logger.error("DB Error log_chat: %s", e)
200
 
201
  def log_call(profile_id: str, payload: Dict[str, Any]) -> str:
202
+ if not db: return str(int(time.time()))
 
 
203
  try:
204
+ ref = db.collection("pricelyst_profiles").document(profile_id).collection("call_logs").document()
205
+ ref.set({
 
206
  **payload,
207
  "ts": now_utc_iso()
208
  })
209
+ return ref.id
 
210
  except Exception as e:
211
+ logger.error("DB Error log_call: %s", e)
212
  return ""
213
 
214
  # =========================
215
+ # Data Ingestion (ETL)
216
  # =========================
217
 
218
+ def fetch_products(max_pages: int = 10, per_page: int = 50) -> List[Dict[str, Any]]:
219
+ """Fetch raw products from Pricelyst API."""
220
+ all_products = []
221
+ for p in range(1, max_pages + 1):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
  try:
223
+ url = f"{PRICE_API_BASE}/api/v1/products"
224
+ r = requests.get(url, params={"page": p, "perPage": per_page}, timeout=HTTP_TIMEOUT)
225
+ r.raise_for_status()
226
+ data = r.json().get("data") or []
227
+ if not data: break
228
+ all_products.extend(data)
229
 
230
+ # Pagination check
231
+ meta = r.json()
232
+ if p >= (meta.get("totalPages") or 999):
233
+ break
234
+ except Exception as e:
235
+ logger.warning(f"Product fetch error page {p}: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
  break
237
+ return all_products
238
 
239
+ def flatten_products_to_df(products: List[Dict[str, Any]]) -> pd.DataFrame:
240
+ """
241
+ Strict mapping of the nested JSON structure to a flat search index.
242
+ Structure: product -> prices[] -> retailer
243
+ """
244
  rows = []
245
+ for p in products:
246
  try:
247
+ p_id = p.get("id")
248
+ p_name = p.get("name") or "Unknown"
249
+ p_desc = p.get("description") or ""
250
+ p_slug = p.get("slug") or ""
 
 
 
 
 
 
 
251
 
252
+ # Get Primary Category
253
+ cat_name = "General"
254
+ cats = p.get("categories") or []
255
+ if cats and isinstance(cats, list) and len(cats) > 0:
256
+ cat_name = cats[0].get("name") or "General"
257
+
258
+ # Brand
259
+ brand_name = (p.get("brand") or {}).get("brand_name") or ""
260
+
261
+ # Iterate Prices (Real Offers)
262
+ prices = p.get("prices") or []
 
 
 
263
 
264
+ # If no prices, we still want the product indexed for knowledge, just no price
265
+ if not prices:
266
+ # Check for base price on product object as fallback
267
+ base_price = _coerce_float(p.get("price"))
268
+ if base_price > 0:
269
+ rows.append({
270
+ "product_id": p_id,
271
+ "product_name": p_name,
272
+ "clean_name": _norm_str(p_name),
273
+ "description": p_desc,
274
+ "category": cat_name,
275
+ "brand": brand_name,
276
+ "retailer": "Pricelyst Base", # Placeholder
277
+ "price": base_price,
278
+ "image": p.get("thumbnail") or p.get("image"),
279
+ })
 
 
 
 
 
 
 
 
 
 
 
280
  continue
281
 
282
+ for offer in prices:
283
+ retailer_obj = offer.get("retailer") or {}
284
+ retailer_name = retailer_obj.get("name") or "Unknown Store"
285
+ price_val = _coerce_float(offer.get("price"))
286
+
287
+ if price_val > 0:
288
+ rows.append({
289
+ "product_id": p_id,
290
+ "product_name": p_name,
291
+ "clean_name": _norm_str(p_name),
292
+ "description": p_desc,
293
+ "category": cat_name,
294
+ "brand": brand_name,
295
+ "retailer": retailer_name,
296
+ "price": price_val,
297
+ "image": p.get("thumbnail") or p.get("image"),
298
+ })
299
+
300
+ except Exception as e:
 
 
 
 
 
 
 
 
 
 
 
301
  continue
302
 
303
  df = pd.DataFrame(rows)
 
 
 
 
 
 
 
304
  return df
305
 
306
+ def get_data_index(force_refresh: bool = False) -> pd.DataFrame:
307
+ """Singleton accessor for the product Dataframe."""
308
+ global _product_cache
309
+
310
+ # Refresh if empty or stale
311
+ is_stale = (time.time() - _product_cache["ts"]) > PRODUCT_CACHE_TTL_SEC
312
+ if force_refresh or is_stale or _product_cache["df_offers"].empty:
313
+ logger.info("Refreshing Product Index...")
314
+ try:
315
+ raw_products = fetch_products(max_pages=15) # Fetch ~750 products
316
+ df = flatten_products_to_df(raw_products)
317
+
318
+ _product_cache["ts"] = time.time()
319
+ _product_cache["df_offers"] = df
320
+ _product_cache["raw_count"] = len(raw_products)
321
+ logger.info(f"Index Refreshed: {len(df)} offers from {len(raw_products)} products.")
322
+ except Exception as e:
323
+ logger.error(f"Failed to refresh index: {e}")
324
+
325
+ return _product_cache["df_offers"]
326
 
327
  # =========================
328
+ # Search & Matching Logic
329
  # =========================
330
 
331
+ def search_index(df: pd.DataFrame, query: str, limit: int = 5) -> List[Dict[str, Any]]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
332
  """
333
+ Search the DF using token overlap + substring matching.
334
+ Returns best distinct product matches with their best price.
 
 
335
  """
336
+ if df.empty: return []
337
+
338
+ q_norm = _norm_str(query)
339
+ q_tokens = set(q_norm.split())
340
+
341
+ # 1. Exact Substring Filter (Fast)
342
+ # matching_rows = df[df['clean_name'].str.contains(q_norm, regex=False)]
343
+
344
+ # 2. Token Overlap Scoring (Better for "Cooking Oil" -> "Olivine Cooking Oil")
345
+ # We calculate a score 0-1 based on how many query tokens exist in product name
346
+
347
+ def score_text(text):
348
+ if not isinstance(text, str): return 0
349
+ text_tokens = set(text.split())
350
+ if not text_tokens: return 0
351
+ intersection = q_tokens.intersection(text_tokens)
352
+ return len(intersection) / len(q_tokens) # % of query satisfied
353
+
354
+ # Copy to avoid warnings (this is in-memory, acceptable for <10k rows)
355
+ # For performance at scale, use vector DB or proper Search engine
356
+ temp_df = df.copy()
357
+ temp_df['score'] = temp_df['clean_name'].apply(score_text)
358
+
359
+ # Filter for relevant matches (at least 50% token match or substring match)
360
+ matches = temp_df[ (temp_df['score'] > 0.4) | (temp_df['clean_name'].str.contains(q_norm, regex=False)) ]
361
+
362
+ if matches.empty:
363
+ # Fallback: Try searching category
364
+ matches = temp_df[temp_df['category'].str.lower().str.contains(q_norm, na=False)]
365
 
366
+ if matches.empty:
367
+ return []
368
+
369
+ # Sort by Score desc, then Price asc
370
+ matches = matches.sort_values(by=['score', 'price'], ascending=[False, True])
371
+
372
+ # Group by product_id to get unique products (Best Offer per product)
373
+ unique_products = []
374
+ seen_ids = set()
375
 
376
+ for _, row in matches.iterrows():
377
+ pid = row['product_id']
378
+ if pid in seen_ids: continue
379
+ seen_ids.add(pid)
380
+
381
+ unique_products.append({
382
+ "id": pid,
383
+ "name": row['product_name'],
384
+ "price": row['price'],
385
+ "retailer": row['retailer'],
386
+ "category": row['category'],
387
+ "image": row['image']
388
+ })
389
+ if len(unique_products) >= limit: break
390
+
391
+ return unique_products
392
 
393
+ # =========================
394
+ # Gemini Functions
395
+ # =========================
396
+
397
+ def gemini_generate_json(system_prompt: str, user_prompt: str) -> Dict[str, Any]:
398
+ if not _gemini_client: return {}
399
  try:
400
+ response = _gemini_client.models.generate_content(
401
  model=GEMINI_MODEL,
402
+ contents=[
403
+ types.Content(role="user", parts=[
404
+ types.Part.from_text(system_prompt + "\n\n" + user_prompt)
405
+ ])
406
+ ],
407
+ config=types.GenerateContentConfig(
408
+ response_mime_type="application/json",
409
+ temperature=0.2
410
+ )
411
  )
412
+ return json.loads(response.text)
413
  except Exception as e:
414
+ logger.error(f"Gemini JSON Error: {e}")
415
+ return {}
416
 
417
  # =========================
418
+ # Shopping Plan Engine (RAG)
419
  # =========================
420
 
421
+ EXTRACT_SYSTEM_PROMPT = """
422
+ You are a Shopping Assistant Data Extractor.
423
+ Analyze the transcript and extract a list of shopping items the user implicitly or explicitly wants.
424
+ Return JSON: { "items": [ { "name": "searchable term", "qty": "quantity string", "notes": "context" } ] }
425
+ If no items found, return { "items": [] }.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426
  """
427
 
428
+ SYNTHESIS_SYSTEM_PROMPT = """
429
+ You are Jessica, Pricelyst's Shopping Advisor.
430
+ Generate a shopping plan based on the USER TRANSCRIPT and the DATA CONTEXT provided.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
431
 
432
+ RULES:
433
+ 1. USE REAL DATA: Use the prices and retailers found in DATA CONTEXT. Do not hallucinate prices.
434
+ 2. MISSING DATA: If an item is in the transcript but has "FOUND: False" in context, explicitly estimate it and mark it as "(Est)".
435
+ 3. FORMAT: Return strict JSON with a 'markdown_content' field containing a professional, formatted report (Tables, Totals, Tips).
436
 
437
+ JSON SCHEMA:
438
  {
439
+ "is_actionable": true,
440
+ "title": "Shopping Plan Title",
441
+ "markdown_content": "# Title\n\n..."
 
 
442
  }
 
 
443
  """
444
 
445
+ def build_shopping_plan(transcript: str) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
446
  """
447
+ RAG Pipeline:
448
+ 1. Extract items from text.
449
+ 2. Search DB for items.
450
+ 3. Generate report using DB results.
451
  """
452
+ if len(transcript) < 10:
453
+ return {"is_actionable": False}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
454
 
455
+ # Step 1: Extraction
456
+ extraction = gemini_generate_json(EXTRACT_SYSTEM_PROMPT, f"TRANSCRIPT:\n{transcript}")
457
+ items_requested = extraction.get("items", [])
 
 
 
 
458
 
459
+ if not items_requested:
460
+ return {"is_actionable": False}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
461
 
462
+ df = get_data_index()
 
 
 
 
 
 
 
463
 
464
+ # Step 2: Retrieval (The "Grounding")
465
+ context_lines = []
466
+ total_est = 0.0
467
 
468
+ for item in items_requested:
469
+ term = item.get("name", "")
470
+ qty_str = item.get("qty", "1")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
471
 
472
+ # Check ZIM_ESSENTIALS first
473
+ ess_key = next((k for k in ZIM_ESSENTIALS if k in term.lower()), None)
 
 
 
 
 
474
 
475
+ if ess_key:
476
+ data = ZIM_ESSENTIALS[ess_key]
477
+ price = data['price']
478
+ context_lines.append(f"- ITEM: {term} (Qty: {qty_str}) | FOUND: TRUE | SOURCE: Market Rate | PRICE: ${price} | RETAILER: {data['retailer']}")
479
+ total_est += price
480
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
481
 
482
+ # Search Index
483
+ hits = search_index(df, term, limit=1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
484
 
485
+ if hits:
486
+ best = hits[0]
487
+ context_lines.append(f"- ITEM: {term} (Qty: {qty_str}) | FOUND: TRUE | PRODUCT: {best['name']} | PRICE: ${best['price']} | RETAILER: {best['retailer']}")
488
+ total_est += best['price']
489
+ else:
490
+ context_lines.append(f"- ITEM: {term} (Qty: {qty_str}) | FOUND: FALSE | NOTE: Needs estimation.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
491
 
492
+ data_context = "\n".join(context_lines)
493
+ logger.info(f"Plan Gen Context:\n{data_context}")
 
 
 
 
494
 
495
+ # Step 3: Synthesis
496
+ final_prompt = f"TRANSCRIPT:\n{transcript}\n\nDATA CONTEXT (Real Prices Found):\n{data_context}"
497
+ plan = gemini_generate_json(SYNTHESIS_SYSTEM_PROMPT, final_prompt)
498
+
499
+ # Add metadata for frontend
500
+ plan["items_found"] = len([l for l in context_lines if "FOUND: TRUE" in l])
501
+
502
+ return plan
 
 
 
503
 
504
  # =========================
505
+ # API Endpoints
506
  # =========================
507
 
508
  @app.get("/health")
509
  def health():
510
+ df = get_data_index()
511
  return jsonify({
512
  "ok": True,
513
  "ts": now_utc_iso(),
514
+ "db_connected": bool(db),
515
+ "products_indexed": len(df)
 
 
516
  })
517
 
518
  @app.post("/chat")
519
+ def chat_endpoint():
520
+ """Text chat endpoint - kept mostly for legacy/debug, similar logic to voice."""
521
  body = request.get_json(silent=True) or {}
522
+ message = body.get("message", "")
523
+ profile_id = body.get("profile_id")
524
+
525
  if not profile_id:
526
+ return jsonify({"ok": False, "error": "No profile_id"}), 400
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
527
 
528
+ # Simple intent check for search
529
+ intent = "chat"
530
+ reply_data = {}
531
 
532
+ if "price" in message.lower() or "find" in message.lower() or len(message.split()) < 5:
533
+ df = get_data_index()
534
+ hits = search_index(df, message, limit=3)
535
+ if hits:
536
+ intent = "product_found"
537
+ reply_data = {
538
+ "type": "product_card",
539
+ "products": hits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
540
  }
541
  else:
542
+ reply_data = {"message": "I couldn't find that product in our database."}
543
+ else:
544
+ # Gemini Chat fallback
545
+ reply_data = {"message": "I can help you plan your shopping. Tell me what you need!"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
546
 
547
+ log_chat(profile_id, {"message": message, "intent": intent, "reply": reply_data})
548
+
549
+ return jsonify({
550
+ "ok": True,
551
+ "intent": {"actionable": True if intent == "product_found" else False},
552
+ "data": reply_data
553
  })
554
 
 
 
 
 
 
 
 
 
 
555
  @app.post("/api/call-briefing")
556
  def call_briefing():
557
+ """
558
+ Called by Frontend before ElevenLabs starts.
559
+ Provides context (memory, user name, tone) to the AI Agent.
560
+ """
561
  body = request.get_json(silent=True) or {}
562
+ profile_id = body.get("profile_id")
563
+ username = body.get("username")
564
+
565
  if not profile_id:
566
+ return jsonify({"ok": False, "error": "Missing profile_id"}), 400
567
 
 
568
  prof = get_profile(profile_id)
569
+ if username:
 
570
  update_profile(profile_id, {"username": username})
 
571
 
572
+ # Shopping Intelligence Payload
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
573
  kpi_data = {
574
+ "username": username or prof.get("username") or "there",
575
+ "market_rates": ZIM_ESSENTIALS,
576
+ "tone": "helpful_zimbabwean",
577
+ "system_note": "You are Jessica. Use the 'market_rates' for fuel/bread if asked. For other items, ask them what they need and say you will generate a plan after the call."
 
578
  }
579
 
580
  return jsonify({
581
  "ok": True,
 
582
  "memory_summary": prof.get("memory_summary", ""),
583
+ "kpi_snapshot": json.dumps(kpi_data)
 
584
  })
585
 
586
  @app.post("/api/log-call-usage")
587
  def log_call_usage():
588
+ """
589
+ Called by Frontend when call ends.
590
+ 1. Receives Transcript.
591
+ 2. Generates RAG-based Shopping Plan.
592
+ 3. Saves to Firestore.
593
+ """
594
  body = request.get_json(silent=True) or {}
595
+ profile_id = body.get("profile_id")
596
+ transcript = body.get("transcript", "")
 
 
 
 
 
 
 
597
 
598
+ if not profile_id:
599
+ return jsonify({"ok": False, "error": "Missing profile_id"}), 400
600
 
601
+ logger.info(f"Processing Call for {profile_id}. Transcript Len: {len(transcript)}")
602
 
603
+ # Generate Plan (Ground Truth)
 
 
604
  plan_data = {}
605
+ plan_id = None
606
+
607
+ if len(transcript) > 20:
608
+ try:
609
+ plan_data = build_shopping_plan(transcript)
610
+
 
611
  if plan_data.get("is_actionable"):
612
+ # Persist Plan
613
  plan_ref = db.collection("pricelyst_profiles").document(profile_id).collection("shopping_plans").document()
614
  plan_data["id"] = plan_ref.id
 
615
  plan_data["created_at"] = now_utc_iso()
616
  plan_ref.set(plan_data)
617
  plan_id = plan_ref.id
618
+ logger.info(f"Plan Created: {plan_id}")
619
+ except Exception as e:
620
+ logger.error(f"Plan Gen Error: {e}")
621
+
622
+ # Log Call
623
+ log_call(profile_id, {
 
 
 
 
 
 
 
624
  "transcript": transcript,
625
+ "duration": body.get("duration_seconds"),
626
+ "plan_id": plan_id
627
  })
628
+
629
+ # Update Counters
630
+ prof = get_profile(profile_id)
631
+ cnt = prof.get("counters", {})
632
+ update_profile(profile_id, {"counters": {"calls": int(cnt.get("calls", 0)) + 1}})
 
 
633
 
634
  return jsonify({
635
  "ok": True,
636
+ "shopping_plan": plan_data if plan_data.get("is_actionable") else None
 
637
  })
638
 
639
+ # ––––– CRUD: Shopping Plans –––––
640
 
641
  @app.get("/api/shopping-plans")
642
  def list_plans():
 
644
  if not pid: return jsonify({"ok": False}), 400
645
  try:
646
  docs = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans") \
647
+ .order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream()
648
  plans = [{"id": d.id, **d.to_dict()} for d in docs]
649
  return jsonify({"ok": True, "plans": plans})
 
 
 
 
 
 
 
 
 
 
 
 
650
  except Exception as e:
651
  return jsonify({"ok": False, "error": str(e)}), 500
652
 
 
661
  return jsonify({"ok": False, "error": str(e)}), 500
662
 
663
  # =========================
664
+ # Main
665
  # =========================
666
 
667
  if __name__ == "__main__":
668
+ port = int(os.environ.get("PORT", 7860))
669
+ # Pre-warm cache on startup
670
+ try:
671
+ get_data_index(force_refresh=True)
672
+ except:
673
+ pass
674
+ app.run(host="0.0.0.0", port=port)