rairo commited on
Commit
9e7ded1
·
verified ·
1 Parent(s): c83774d

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +214 -247
main.py CHANGED
@@ -1,12 +1,13 @@
1
  """
2
- main.py — Pricelyst Shopping Advisor (Analyst Edition)
3
 
4
  ✅ Flask API
5
  ✅ Firebase Admin Persistence
6
  ✅ Gemini via google-genai SDK (Robust)
7
  ✅ "Analyst Engine": Python Math for Baskets, ZESA, & Fuel
8
  ✅ Ground Truth Data: Uses /api/v1/product-listing
9
- ✅ Real-Time Basket Optimization
 
10
 
11
  ENV VARS:
12
  - GOOGLE_API_KEY=...
@@ -23,7 +24,7 @@ import time
23
  import math
24
  import logging
25
  from datetime import datetime, timezone
26
- from typing import Any, Dict, List, Optional, Tuple
27
 
28
  import requests
29
  import pandas as pd
@@ -91,15 +92,14 @@ HTTP_TIMEOUT = 30
91
  # ––––– Static Data (Zim Context) –––––
92
 
93
  ZIM_UTILITIES = {
94
- "fuel_petrol": 1.58, # USD per Litre
95
- "fuel_diesel": 1.65, # USD per Litre
96
- "gas_lpg": 2.00, # USD per kg
97
- "bread": 1.00, # USD fixed
98
- # ZESA Estimates (Simplified Stepped Tariff)
99
- "zesa_step_1": {"limit": 50, "rate": 0.04}, # First 50 units (Life line)
100
- "zesa_step_2": {"limit": 150, "rate": 0.09}, # Next 150
101
- "zesa_step_3": {"limit": 9999, "rate": 0.14}, # Balance
102
- "zesa_levy": 0.06 # 6% REA levy approx
103
  }
104
 
105
  # ––––– Cache –––––
@@ -107,7 +107,7 @@ ZIM_UTILITIES = {
107
  PRODUCT_CACHE_TTL = 60 * 20 # 20 mins
108
  _data_cache: Dict[str, Any] = {
109
  "ts": 0,
110
- "df": pd.DataFrame(), # Columns: [id, name, clean_name, brand, category, retailer, price, views, image]
111
  "raw_count": 0
112
  }
113
 
@@ -119,7 +119,6 @@ CORS(app)
119
  # =========================
120
 
121
  def _norm(s: Any) -> str:
122
- """Normalize string for fuzzy search."""
123
  if not s: return ""
124
  return str(s).strip().lower()
125
 
@@ -129,17 +128,26 @@ def _coerce_price(v: Any) -> float:
129
  except:
130
  return 0.0
131
 
 
 
 
 
 
 
 
 
 
 
 
132
  def fetch_and_flatten_data() -> pd.DataFrame:
133
- """
134
- Fetches from /api/v1/product-listing and flattens into an analytical DF.
135
- Each row represents a single 'Offer' (Product X at Retailer Y).
136
- """
137
  all_products = []
138
  page = 1
139
 
 
 
140
  while True:
141
  try:
142
- # New Endpoint Structure
143
  url = f"{PRICE_API_BASE}/api/v1/product-listing"
144
  r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT)
145
  r.raise_for_status()
@@ -165,8 +173,6 @@ def fetch_and_flatten_data() -> pd.DataFrame:
165
  p_name = p.get("name") or "Unknown"
166
  clean_name = _norm(p_name)
167
 
168
- # Category & Brand extraction
169
- # Based on user JSON: 'category' is an object inside product
170
  cat_obj = p.get("category") or {}
171
  cat_name = cat_obj.get("name") or "General"
172
 
@@ -176,10 +182,8 @@ def fetch_and_flatten_data() -> pd.DataFrame:
176
  views = int(p.get("view_count") or 0)
177
  image = p.get("thumbnail") or p.get("image")
178
 
179
- # Prices array
180
  prices = p.get("prices") or []
181
 
182
- # If no prices, we still index product for "Knowledge" but with price=0
183
  if not prices:
184
  rows.append({
185
  "product_id": p_id,
@@ -217,10 +221,10 @@ def fetch_and_flatten_data() -> pd.DataFrame:
217
  continue
218
 
219
  df = pd.DataFrame(rows)
 
220
  return df
221
 
222
  def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
223
- """Singleton access to the Dataframe."""
224
  global _data_cache
225
  if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL):
226
  logger.info("ETL: Refreshing Market Index...")
@@ -228,7 +232,6 @@ def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
228
  _data_cache["df"] = df
229
  _data_cache["ts"] = time.time()
230
  _data_cache["raw_count"] = len(df)
231
- logger.info(f"ETL: Loaded {len(df)} market offers.")
232
  return _data_cache["df"]
233
 
234
  # =========================
@@ -236,57 +239,58 @@ def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
236
  # =========================
237
 
238
  def search_products_fuzzy(df: pd.DataFrame, query: str, limit: int = 10) -> pd.DataFrame:
239
- """Finds products matching query (Name, Brand, or Category)."""
240
  if df.empty or not query: return df
241
 
242
  q_norm = _norm(query)
243
- q_tokens = set(q_norm.split())
244
 
245
- # Quick filter: String contains
246
  mask_name = df['clean_name'].str.contains(q_norm, regex=False)
247
- mask_brand = df['brand'].str.lower().str.contains(q_norm, regex=False)
248
- mask_cat = df['category'].str.lower().str.contains(q_norm, regex=False)
249
-
250
- matches = df[mask_name | mask_brand | mask_cat].copy()
251
-
252
- # Simple Scoring
253
- def scorer(row):
254
- score = 0
255
- if q_norm in row['clean_name']: score += 10
256
- if q_norm == row['clean_name']: score += 20
257
- # Popularity boost
258
- score += math.log(row['views'] + 1) * 0.5
259
- return score
260
-
261
- if not matches.empty:
262
- matches['score'] = matches.apply(scorer, axis=1)
263
- return matches.sort_values('score', ascending=False).head(limit)
264
-
265
- return matches
 
 
 
266
 
267
  def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]:
268
  """
269
- Killer Question: 'Where should I buy this list?'
270
- Returns: Best Store, Missing Items, Total Cost.
271
  """
272
  df = get_market_index()
273
- if df.empty: return {"error": "No data"}
 
 
 
 
274
 
275
- basket_results = []
276
  missing_global = []
277
 
278
  # 1. Resolve Items to Real Products
279
- found_items = [] # list of (item_query, product_id, product_name)
280
-
281
  for item in item_names:
282
- # Find best matching product (using popularity tie-breaker)
283
  hits = search_products_fuzzy(df[df['is_offer']==True], item, limit=5)
284
  if hits.empty:
285
  missing_global.append(item)
286
  continue
287
 
288
- # Pick the most popular product that matches this query
289
- best_prod = hits.sort_values('views', ascending=False).iloc[0]
290
  found_items.append({
291
  "query": item,
292
  "product_id": best_prod['product_id'],
@@ -294,16 +298,13 @@ def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]:
294
  })
295
 
296
  if not found_items:
297
- return {"actionable": False, "reason": "No items found in database."}
 
298
 
299
  # 2. Calculate Totals Per Retailer
300
- # We only care about retailers that stock these products
301
  target_pids = [x['product_id'] for x in found_items]
302
-
303
- # Filter DF to only relevant products
304
  relevant_offers = df[df['product_id'].isin(target_pids) & df['is_offer']]
305
 
306
- # Group by Retailer
307
  retailer_stats = []
308
  all_retailers = relevant_offers['retailer'].unique()
309
 
@@ -313,9 +314,8 @@ def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]:
313
  found_count = len(r_df)
314
  total_price = r_df['price'].sum()
315
 
316
- # Identify what this retailer has vs misses
317
  retailer_pids = r_df['product_id'].tolist()
318
- missing_in_store = [x['name'] for x in found_items if x['product_id'] not in retailer_pids]
319
  found_names = [x['name'] for x in found_items if x['product_id'] in retailer_pids]
320
 
321
  retailer_stats.append({
@@ -323,191 +323,123 @@ def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]:
323
  "total_price": float(total_price),
324
  "item_count": found_count,
325
  "coverage_percent": (found_count / len(found_items)) * 100,
326
- "missing": missing_in_store,
327
  "found_items": found_names
328
  })
329
 
330
- # 3. Sort by: Coverage (Desc), then Price (Asc)
331
  retailer_stats.sort(key=lambda x: (-x['coverage_percent'], x['total_price']))
332
 
333
- best_option = retailer_stats[0] if retailer_stats else None
334
-
 
 
 
 
335
  return {
336
  "actionable": True,
337
  "basket_items": [x['name'] for x in found_items],
338
  "global_missing": missing_global,
339
  "best_store": best_option,
340
- "all_stores": retailer_stats[:3] # Return top 3 for comparison
341
  }
342
 
343
  def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]:
344
- """
345
- Killer Question: 'How much ZESA do I get for $20?'
346
- Uses a simplified tiered logic (Approximation of ZESA tariff).
347
- """
348
  remaining = amount_usd / 1.06 # Remove 6% levy approx
349
  units = 0.0
350
  breakdown = []
351
 
352
- # Tier 1: First 50 units (Cheap)
353
  t1 = ZIM_UTILITIES["zesa_step_1"]
354
  cost_t1 = t1["limit"] * t1["rate"]
355
 
356
  if remaining > cost_t1:
357
  units += t1["limit"]
358
  remaining -= cost_t1
359
- breakdown.append(f"First {t1['limit']} units @ ${t1['rate']}")
360
 
361
- # Tier 2: Next 150
362
  t2 = ZIM_UTILITIES["zesa_step_2"]
363
  cost_t2 = t2["limit"] * t2["rate"]
364
 
365
  if remaining > cost_t2:
366
  units += t2["limit"]
367
  remaining -= cost_t2
368
- breakdown.append(f"Next {t2['limit']} units @ ${t2['rate']}")
369
 
370
- # Tier 3: Balance (Expensive)
371
  t3 = ZIM_UTILITIES["zesa_step_3"]
372
  bought = remaining / t3["rate"]
373
  units += bought
374
- breakdown.append(f"Remaining ${(remaining + cost_t1 + cost_t2):.2f} bought {bought:.1f} units @ ${t3['rate']}")
375
  else:
376
  bought = remaining / t2["rate"]
377
  units += bought
378
- breakdown.append(f"Balance bought {bought:.1f} units @ ${t2['rate']}")
379
  else:
380
  bought = remaining / t1["rate"]
381
  units += bought
382
- breakdown.append(f"All {bought:.1f} units @ ${t1['rate']}")
383
 
384
  return {
385
  "amount_usd": amount_usd,
386
  "est_units_kwh": round(units, 1),
387
- "breakdown": breakdown,
388
- "note": "Estimates include ~6% REA levy. Actual units depend on your last purchase date."
389
- }
390
-
391
- def get_product_intelligence(query: str) -> Dict[str, Any]:
392
- """
393
- Killer Question: 'Is this price reasonable?' / 'Most Popular?'
394
- """
395
- df = get_market_index()
396
- hits = search_products_fuzzy(df[df['is_offer']], query, limit=10)
397
-
398
- if hits.empty: return {"found": False}
399
-
400
- # Group by product ID to find the specific product stats
401
- best_match_pid = hits.iloc[0]['product_id']
402
- product_rows = df[(df['product_id'] == best_match_pid) & (df['is_offer'])]
403
-
404
- if product_rows.empty: return {"found": False}
405
-
406
- min_price = product_rows['price'].min()
407
- max_price = product_rows['price'].max()
408
- avg_price = product_rows['price'].mean()
409
- cheapest_row = product_rows.loc[product_rows['price'].idxmin()]
410
-
411
- return {
412
- "found": True,
413
- "name": cheapest_row['product_name'],
414
- "brand": cheapest_row['brand'],
415
- "category": cheapest_row['category'],
416
- "view_count": int(cheapest_row['views']),
417
- "price_stats": {
418
- "min": float(min_price),
419
- "max": float(max_price),
420
- "avg": float(avg_price),
421
- "spread": float(max_price - min_price)
422
- },
423
- "best_deal": {
424
- "retailer": cheapest_row['retailer'],
425
- "price": float(min_price)
426
- },
427
- "all_offers": product_rows[['retailer', 'price']].to_dict('records')
428
  }
429
 
430
  # =========================
431
- # 3. Gemini Context Layer
432
  # =========================
433
 
434
- def generate_analyst_response(transcript: str) -> Dict[str, Any]:
435
  """
436
- 1. Detect Intent (Basket? Utility? Single Item?)
437
- 2. Run Python Analyst Function.
438
- 3. Generate Text Response.
439
  """
440
- if not _gemini_client: return {"message": "AI Brain offline."}
441
-
442
- # Step A: Intent Classification
443
- INTENT_PROMPT = """
444
- Analyze the user input. Return JSON.
445
- Intents:
446
- - "BASKET": User has a list of items (e.g. "Oil, bread and rice").
447
- - "UTILITY": User asks about ZESA, Fuel, Gas prices or units.
448
- - "PRODUCT_INTEL": User asks for "Cheapest X", "Price of X", "Popular X".
449
- - "CHAT": General conversation.
450
-
451
- Output: { "intent": "...", "items": ["..."], "utility_type": "zesa/fuel/gas", "amount": number }
 
452
  """
453
 
454
  try:
455
  resp = _gemini_client.models.generate_content(
456
  model=GEMINI_MODEL,
457
- contents=INTENT_PROMPT + "\nInput: " + transcript,
458
  config=types.GenerateContentConfig(response_mime_type="application/json")
459
  )
460
- parsed = json.loads(resp.text)
461
- except:
462
- parsed = {"intent": "CHAT"}
 
463
 
464
- intent = parsed.get("intent")
465
- data_context = {}
466
 
467
- # Step B: Execute Analyst Logic
468
- if intent == "BASKET":
469
- items = parsed.get("items", [])
470
- if items:
471
- data_context = calculate_basket_optimization(items)
472
-
473
- elif intent == "UTILITY":
474
- u_type = parsed.get("utility_type", "")
475
- amt = parsed.get("amount") or 0
476
- if "zesa" in u_type and amt > 0:
477
- data_context = calculate_zesa_units(float(amt))
478
- elif "fuel" in u_type or "petrol" in u_type:
479
- rate = ZIM_UTILITIES["fuel_petrol"]
480
- data_context = {"type": "Petrol", "rate": rate, "units": amt / rate}
481
-
482
- elif intent == "PRODUCT_INTEL":
483
- items = parsed.get("items", [])
484
- if items:
485
- data_context = get_product_intelligence(items[0])
486
-
487
- # Step C: Synthesis (Speak based on Data)
488
- SYNTHESIS_PROMPT = f"""
489
- You are Jessica, the Pricelyst Analyst.
490
- User Input: "{transcript}"
491
 
492
- ANALYST DATA (Strictly use this):
493
- {json.dumps(data_context, indent=2)}
494
 
495
- If 'actionable' is false or data is empty, suggest what data you need.
496
- If basket data exists, summarize: "The best store for your basket is [Retailer] at $[Total]."
497
- If ZESA data exists, be precise about units.
498
- Keep it helpful and Zimbabwean.
499
  """
500
 
501
- final_resp = _gemini_client.models.generate_content(
502
- model=GEMINI_MODEL,
503
- contents=SYNTHESIS_PROMPT
504
- )
505
-
506
- return {
507
- "intent": intent,
508
- "analyst_data": data_context,
509
- "message": final_resp.text
510
- }
511
 
512
  # =========================
513
  # 4. Endpoints
@@ -531,23 +463,38 @@ def chat():
531
 
532
  if not pid: return jsonify({"ok": False}), 400
533
 
534
- response_data = generate_analyst_response(msg)
 
 
535
 
536
- # Log interaction
 
 
 
 
 
 
 
 
 
 
 
 
 
537
  if db:
538
  db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({
539
  "message": msg,
540
- "response": response_data,
 
541
  "ts": datetime.now(timezone.utc).isoformat()
542
  })
543
 
544
- return jsonify({"ok": True, "data": response_data})
545
 
546
  @app.post("/api/call-briefing")
547
  def call_briefing():
548
  """
549
- Context for ElevenLabs.
550
- Crucially: We DO NOT send the whole database. We send Memory + Utilities.
551
  """
552
  body = request.get_json(silent=True) or {}
553
  pid = body.get("profile_id")
@@ -564,11 +511,25 @@ def call_briefing():
564
  else:
565
  ref.set({"created_at": datetime.now(timezone.utc).isoformat()})
566
 
567
- # Simple snapshot
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
568
  kpi_snapshot = {
569
- "username": username or prof.get("username", "Friend"),
570
- "utilities": ZIM_UTILITIES,
571
- "instructions": "You are Jessica. If asked for prices, say you can check the live system. For ZESA/Fuel, use the 'utilities' variable."
572
  }
573
 
574
  return jsonify({
@@ -581,8 +542,9 @@ def call_briefing():
581
  def log_call_usage():
582
  """
583
  Post-Call Processor.
584
- 1. Update Memory.
585
- 2. Generate Grounded Shopping Plan.
 
586
  """
587
  body = request.get_json(silent=True) or {}
588
  pid = body.get("profile_id")
@@ -590,66 +552,71 @@ def log_call_usage():
590
 
591
  if not pid: return jsonify({"ok": False}), 400
592
 
593
- logger.info(f"Processing Call {pid}. Len: {len(transcript)}")
594
-
595
- # 1. Update Memory (Gemini)
596
  if len(transcript) > 20 and db:
597
  try:
598
- prof_ref = db.collection("pricelyst_profiles").document(pid)
599
- curr_mem = prof_ref.get().to_dict().get("memory_summary", "")
600
-
601
- mem_prompt = f"Update this memory summary with new details from the transcript (names, preferences, budget):\nOLD: {curr_mem}\nTRANSCRIPT: {transcript}"
602
-
603
- resp = _gemini_client.models.generate_content(
604
- model=GEMINI_MODEL,
605
- contents=mem_prompt
606
- )
607
- prof_ref.set({"memory_summary": resp.text}, merge=True)
608
  except Exception as e:
609
  logger.error(f"Memory Update Failed: {e}")
610
 
611
- # 2. Generate Plan (Analyst Engine Integration)
612
- # We re-run the Analyst logic specifically for the plan
613
- analyst_result = generate_analyst_response(transcript)
 
614
  plan_data = {}
615
 
616
- if analyst_result.get("intent") == "BASKET" and analyst_result.get("analyst_data", {}).get("actionable"):
617
- # We have a valid basket!
618
- data = analyst_result["analyst_data"]
619
- best = data["best_store"]
620
-
621
- # Markdown Generation
622
- md = f"# Your Shopping Plan\n\n"
623
- md += f"**Best Store:** {best['retailer']}\n"
624
- md += f"**Total Cost:** ${best['total_price']:.2f} (for {best['item_count']} items)\n\n"
625
 
626
- md += "| Item | Found? |\n|---|---|\n"
627
- for item in data['basket_items']:
628
- found = "" if item in best['found_items'] else "❌"
629
- md += f"| {item} | {found} |\n"
630
 
631
- if data['global_missing']:
632
- md += f"\n**Missing from Market:** {', '.join(data['global_missing'])}"
633
-
634
- plan_data = {
635
- "is_actionable": True,
636
- "title": f"Plan: {best['retailer']} (${best['total_price']:.2f})",
637
- "markdown_content": md,
638
- "items": data['basket_items']
639
- }
640
-
641
- # Save Plan
642
- if db:
643
- db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").add({
644
- **plan_data,
645
- "created_at": datetime.now(timezone.utc).isoformat()
646
- })
647
-
648
- # 3. Log Call
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
649
  if db:
650
  db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({
651
  "transcript": transcript,
652
- "analyst_result": analyst_result,
 
653
  "ts": datetime.now(timezone.utc).isoformat()
654
  })
655
 
@@ -658,7 +625,7 @@ def log_call_usage():
658
  "shopping_plan": plan_data if plan_data.get("is_actionable") else None
659
  })
660
 
661
- # ––––– Shopping Plan CRUD (Standard) –––––
662
 
663
  @app.get("/api/shopping-plans")
664
  def list_plans():
@@ -669,8 +636,8 @@ def list_plans():
669
  .order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream()
670
  plans = [{"id": d.id, **d.to_dict()} for d in docs]
671
  return jsonify({"ok": True, "plans": plans})
672
- except Exception as e:
673
- return jsonify({"ok": False, "error": str(e)}), 500
674
 
675
  @app.delete("/api/shopping-plans/<plan_id>")
676
  def delete_plan(plan_id):
 
1
  """
2
+ main.py — Pricelyst Shopping Advisor (Analyst Edition - Full Context)
3
 
4
  ✅ Flask API
5
  ✅ Firebase Admin Persistence
6
  ✅ Gemini via google-genai SDK (Robust)
7
  ✅ "Analyst Engine": Python Math for Baskets, ZESA, & Fuel
8
  ✅ Ground Truth Data: Uses /api/v1/product-listing
9
+ Jessica Context: Injects Top 60 Real Products into Voice Agent
10
+ ✅ Intent Detection: Strict Casual vs Actionable separation
11
 
12
  ENV VARS:
13
  - GOOGLE_API_KEY=...
 
24
  import math
25
  import logging
26
  from datetime import datetime, timezone
27
+ from typing import Any, Dict, List, Optional
28
 
29
  import requests
30
  import pandas as pd
 
92
  # ––––– Static Data (Zim Context) –––––
93
 
94
  ZIM_UTILITIES = {
95
+ "fuel_petrol": 1.58,
96
+ "fuel_diesel": 1.65,
97
+ "gas_lpg": 2.00,
98
+ "bread": 1.00,
99
+ "zesa_step_1": {"limit": 50, "rate": 0.04},
100
+ "zesa_step_2": {"limit": 150, "rate": 0.09},
101
+ "zesa_step_3": {"limit": 9999, "rate": 0.14},
102
+ "zesa_levy": 0.06
 
103
  }
104
 
105
  # ––––– Cache –––––
 
107
  PRODUCT_CACHE_TTL = 60 * 20 # 20 mins
108
  _data_cache: Dict[str, Any] = {
109
  "ts": 0,
110
+ "df": pd.DataFrame(),
111
  "raw_count": 0
112
  }
113
 
 
119
  # =========================
120
 
121
  def _norm(s: Any) -> str:
 
122
  if not s: return ""
123
  return str(s).strip().lower()
124
 
 
128
  except:
129
  return 0.0
130
 
131
+ def _safe_json_loads(s: str, fallback: Any):
132
+ try:
133
+ if "```json" in s:
134
+ s = s.split("```json")[1].split("```")[0]
135
+ elif "```" in s:
136
+ s = s.split("```")[0]
137
+ return json.loads(s)
138
+ except Exception as e:
139
+ logger.error(f"JSON Parse Error: {e}")
140
+ return fallback
141
+
142
  def fetch_and_flatten_data() -> pd.DataFrame:
143
+ """Fetches from /api/v1/product-listing and flattens into an analytical DF."""
 
 
 
144
  all_products = []
145
  page = 1
146
 
147
+ logger.info("ETL: Starting fetch from /api/v1/product-listing")
148
+
149
  while True:
150
  try:
 
151
  url = f"{PRICE_API_BASE}/api/v1/product-listing"
152
  r = requests.get(url, params={"page": page, "perPage": 50}, timeout=HTTP_TIMEOUT)
153
  r.raise_for_status()
 
173
  p_name = p.get("name") or "Unknown"
174
  clean_name = _norm(p_name)
175
 
 
 
176
  cat_obj = p.get("category") or {}
177
  cat_name = cat_obj.get("name") or "General"
178
 
 
182
  views = int(p.get("view_count") or 0)
183
  image = p.get("thumbnail") or p.get("image")
184
 
 
185
  prices = p.get("prices") or []
186
 
 
187
  if not prices:
188
  rows.append({
189
  "product_id": p_id,
 
221
  continue
222
 
223
  df = pd.DataFrame(rows)
224
+ logger.info(f"ETL: Flattened into {len(df)} rows.")
225
  return df
226
 
227
  def get_market_index(force_refresh: bool = False) -> pd.DataFrame:
 
228
  global _data_cache
229
  if force_refresh or _data_cache["df"].empty or (time.time() - _data_cache["ts"] > PRODUCT_CACHE_TTL):
230
  logger.info("ETL: Refreshing Market Index...")
 
232
  _data_cache["df"] = df
233
  _data_cache["ts"] = time.time()
234
  _data_cache["raw_count"] = len(df)
 
235
  return _data_cache["df"]
236
 
237
  # =========================
 
239
  # =========================
240
 
241
  def search_products_fuzzy(df: pd.DataFrame, query: str, limit: int = 10) -> pd.DataFrame:
 
242
  if df.empty or not query: return df
243
 
244
  q_norm = _norm(query)
 
245
 
246
+ # 1. Broad Filter (Contains)
247
  mask_name = df['clean_name'].str.contains(q_norm, regex=False)
248
+ matches = df[mask_name].copy()
249
+
250
+ # 2. If no exact contains, try token overlap
251
+ if matches.empty:
252
+ q_tokens = set(q_norm.split())
253
+ def token_score(text):
254
+ if not isinstance(text, str): return 0
255
+ text_tokens = set(text.split())
256
+ if not text_tokens: return 0
257
+ intersection = q_tokens.intersection(text_tokens)
258
+ return len(intersection)
259
+
260
+ df_scored = df.copy()
261
+ df_scored['score'] = df_scored['clean_name'].apply(token_score)
262
+ matches = df_scored[df_scored['score'] > 0]
263
+
264
+ if matches.empty:
265
+ return matches
266
+
267
+ # 3. Rank by Popularity (Views) + Price
268
+ matches = matches.sort_values(by=['views', 'price'], ascending=[False, True])
269
+ return matches.head(limit)
270
 
271
  def calculate_basket_optimization(item_names: List[str]) -> Dict[str, Any]:
272
  """
273
+ Determines the best store for a list of items.
 
274
  """
275
  df = get_market_index()
276
+ if df.empty:
277
+ logger.warning("Basket Engine: DF is empty.")
278
+ return {"actionable": False, "error": "No data"}
279
+
280
+ logger.info(f"Basket Engine: Optimizing for {len(item_names)} items: {item_names}")
281
 
282
+ found_items = []
283
  missing_global = []
284
 
285
  # 1. Resolve Items to Real Products
 
 
286
  for item in item_names:
 
287
  hits = search_products_fuzzy(df[df['is_offer']==True], item, limit=5)
288
  if hits.empty:
289
  missing_global.append(item)
290
  continue
291
 
292
+ # Pick best match (First one is sorted by Views/Price)
293
+ best_prod = hits.iloc[0]
294
  found_items.append({
295
  "query": item,
296
  "product_id": best_prod['product_id'],
 
298
  })
299
 
300
  if not found_items:
301
+ logger.info("Basket Engine: No items matched in DB.")
302
+ return {"actionable": False, "missing": missing_global}
303
 
304
  # 2. Calculate Totals Per Retailer
 
305
  target_pids = [x['product_id'] for x in found_items]
 
 
306
  relevant_offers = df[df['product_id'].isin(target_pids) & df['is_offer']]
307
 
 
308
  retailer_stats = []
309
  all_retailers = relevant_offers['retailer'].unique()
310
 
 
314
  found_count = len(r_df)
315
  total_price = r_df['price'].sum()
316
 
317
+ # Identify misses
318
  retailer_pids = r_df['product_id'].tolist()
 
319
  found_names = [x['name'] for x in found_items if x['product_id'] in retailer_pids]
320
 
321
  retailer_stats.append({
 
323
  "total_price": float(total_price),
324
  "item_count": found_count,
325
  "coverage_percent": (found_count / len(found_items)) * 100,
 
326
  "found_items": found_names
327
  })
328
 
329
+ # 3. Sort: Coverage Desc, Price Asc
330
  retailer_stats.sort(key=lambda x: (-x['coverage_percent'], x['total_price']))
331
 
332
+ if not retailer_stats:
333
+ return {"actionable": False}
334
+
335
+ best_option = retailer_stats[0]
336
+ logger.info(f"Basket Engine: Best Store = {best_option['retailer']} (${best_option['total_price']})")
337
+
338
  return {
339
  "actionable": True,
340
  "basket_items": [x['name'] for x in found_items],
341
  "global_missing": missing_global,
342
  "best_store": best_option,
343
+ "all_stores": retailer_stats[:3]
344
  }
345
 
346
  def calculate_zesa_units(amount_usd: float) -> Dict[str, Any]:
 
 
 
 
347
  remaining = amount_usd / 1.06 # Remove 6% levy approx
348
  units = 0.0
349
  breakdown = []
350
 
 
351
  t1 = ZIM_UTILITIES["zesa_step_1"]
352
  cost_t1 = t1["limit"] * t1["rate"]
353
 
354
  if remaining > cost_t1:
355
  units += t1["limit"]
356
  remaining -= cost_t1
357
+ breakdown.append(f"First {t1['limit']}u @ ${t1['rate']}")
358
 
 
359
  t2 = ZIM_UTILITIES["zesa_step_2"]
360
  cost_t2 = t2["limit"] * t2["rate"]
361
 
362
  if remaining > cost_t2:
363
  units += t2["limit"]
364
  remaining -= cost_t2
365
+ breakdown.append(f"Next {t2['limit']}u @ ${t2['rate']}")
366
 
 
367
  t3 = ZIM_UTILITIES["zesa_step_3"]
368
  bought = remaining / t3["rate"]
369
  units += bought
370
+ breakdown.append(f"Balance ${(remaining + cost_t1 + cost_t2):.2f} -> {bought:.1f}u @ ${t3['rate']}")
371
  else:
372
  bought = remaining / t2["rate"]
373
  units += bought
374
+ breakdown.append(f"Balance -> {bought:.1f}u @ ${t2['rate']}")
375
  else:
376
  bought = remaining / t1["rate"]
377
  units += bought
378
+ breakdown.append(f"All {bought:.1f}u @ ${t1['rate']}")
379
 
380
  return {
381
  "amount_usd": amount_usd,
382
  "est_units_kwh": round(units, 1),
383
+ "breakdown": breakdown
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
384
  }
385
 
386
  # =========================
387
+ # 3. Gemini Helpers (Strict)
388
  # =========================
389
 
390
+ def gemini_detect_intent(transcript: str) -> Dict[str, Any]:
391
  """
392
+ Classifies if the conversation needs an Analyst action.
 
 
393
  """
394
+ if not _gemini_client: return {"actionable": False}
395
+
396
+ PROMPT = """
397
+ Analyze this transcript. Return STRICT JSON.
398
+ Is the user asking for shopping help (prices, basket, store advice, ZESA/Fuel)?
399
+
400
+ Output Schema:
401
+ {
402
+ "actionable": boolean,
403
+ "intent": "SHOPPING_BASKET" | "UTILITY_CALC" | "PRODUCT_SEARCH" | "CASUAL_CHAT",
404
+ "items": ["item1", "item2"] (if applicable),
405
+ "utility_amount": number (if applicable for ZESA/Fuel)
406
+ }
407
  """
408
 
409
  try:
410
  resp = _gemini_client.models.generate_content(
411
  model=GEMINI_MODEL,
412
+ contents=PROMPT + "\nTranscript: " + transcript,
413
  config=types.GenerateContentConfig(response_mime_type="application/json")
414
  )
415
+ return _safe_json_loads(resp.text, {"actionable": False, "intent": "CASUAL_CHAT"})
416
+ except Exception as e:
417
+ logger.error(f"Intent Detect Error: {e}")
418
+ return {"actionable": False, "intent": "CASUAL_CHAT"}
419
 
420
+ def gemini_chat_response(transcript: str, analyst_data: Dict) -> str:
421
+ if not _gemini_client: return "System offline."
422
 
423
+ PROMPT = f"""
424
+ You are Jessica, Pricelyst Analyst.
425
+ User asked: "{transcript}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426
 
427
+ DATA (Use this strictly):
428
+ {json.dumps(analyst_data, indent=2)}
429
 
430
+ If 'actionable' is true, summarize the Best Store and Total Cost.
431
+ If ZESA data is present, give the units estimate.
432
+ Keep it short, helpful, and Zimbabwean.
 
433
  """
434
 
435
+ try:
436
+ resp = _gemini_client.models.generate_content(
437
+ model=GEMINI_MODEL,
438
+ contents=PROMPT
439
+ )
440
+ return resp.text
441
+ except:
442
+ return "I have the data but couldn't summarize it."
 
 
443
 
444
  # =========================
445
  # 4. Endpoints
 
463
 
464
  if not pid: return jsonify({"ok": False}), 400
465
 
466
+ # 1. Detect Intent
467
+ intent_data = gemini_detect_intent(msg)
468
+ analyst_data = {}
469
 
470
+ # 2. Run Analyst (if actionable)
471
+ if intent_data.get("actionable"):
472
+ if intent_data["intent"] == "SHOPPING_BASKET" and intent_data.get("items"):
473
+ analyst_data = calculate_basket_optimization(intent_data["items"])
474
+ elif intent_data["intent"] == "UTILITY_CALC":
475
+ analyst_data = calculate_zesa_units(intent_data.get("utility_amount", 20))
476
+ elif intent_data["intent"] == "PRODUCT_SEARCH" and intent_data.get("items"):
477
+ # Reuse basket logic for single item search to get best store
478
+ analyst_data = calculate_basket_optimization(intent_data["items"])
479
+
480
+ # 3. Generate Reply
481
+ reply = gemini_chat_response(msg, analyst_data)
482
+
483
+ # Log
484
  if db:
485
  db.collection("pricelyst_profiles").document(pid).collection("chat_logs").add({
486
  "message": msg,
487
+ "response_text": reply,
488
+ "intent": intent_data,
489
  "ts": datetime.now(timezone.utc).isoformat()
490
  })
491
 
492
+ return jsonify({"ok": True, "data": {"message": reply, "analyst": analyst_data}})
493
 
494
  @app.post("/api/call-briefing")
495
  def call_briefing():
496
  """
497
+ Injects Memory + Top Products Catalogue for the Voice Agent.
 
498
  """
499
  body = request.get_json(silent=True) or {}
500
  pid = body.get("profile_id")
 
511
  else:
512
  ref.set({"created_at": datetime.now(timezone.utc).isoformat()})
513
 
514
+ if username and username != prof.get("username"):
515
+ if db: db.collection("pricelyst_profiles").document(pid).set({"username": username}, merge=True)
516
+
517
+ # --- Generate Mini-Catalogue (Top 60 popular items) ---
518
+ df = get_market_index()
519
+ top_products_str = ""
520
+ if not df.empty:
521
+ # Sort by views desc, take top 60 unique product names
522
+ top_offers = df[df['is_offer']].sort_values('views', ascending=False).drop_duplicates('product_name').head(60)
523
+ # Format: "Name ($AvgPrice)"
524
+ items_list = []
525
+ for _, r in top_offers.iterrows():
526
+ items_list.append(f"{r['product_name']} (~${r['price']:.2f})")
527
+ top_products_str = ", ".join(items_list)
528
+
529
+ # Payload for ElevenLabs (Data Variables Only)
530
  kpi_snapshot = {
531
+ "market_rates": ZIM_UTILITIES,
532
+ "popular_products_catalogue": top_products_str
 
533
  }
534
 
535
  return jsonify({
 
542
  def log_call_usage():
543
  """
544
  Post-Call Processor.
545
+ 1. Intent Check (Strict).
546
+ 2. Analyst Optimization.
547
+ 3. Plan Gen & Persistence.
548
  """
549
  body = request.get_json(silent=True) or {}
550
  pid = body.get("profile_id")
 
552
 
553
  if not pid: return jsonify({"ok": False}), 400
554
 
555
+ logger.info(f"Log Call: Processing {pid}. Transcript Len: {len(transcript)}")
556
+
557
+ # 1. Update Memory (Async-like)
558
  if len(transcript) > 20 and db:
559
  try:
560
+ curr_mem = db.collection("pricelyst_profiles").document(pid).get().to_dict().get("memory_summary", "")
561
+ mem_prompt = f"Update user memory (concise) with new details:\nOLD: {curr_mem}\nTRANSCRIPT: {transcript}"
562
+ mem_resp = _gemini_client.models.generate_content(model=GEMINI_MODEL, contents=mem_prompt)
563
+ db.collection("pricelyst_profiles").document(pid).set({"memory_summary": mem_resp.text}, merge=True)
 
 
 
 
 
 
564
  except Exception as e:
565
  logger.error(f"Memory Update Failed: {e}")
566
 
567
+ # 2. Intent Detection (The Gatekeeper)
568
+ intent_data = gemini_detect_intent(transcript)
569
+ logger.info(f"Log Call: Intent detected: {intent_data.get('intent')}")
570
+
571
  plan_data = {}
572
 
573
+ # 3. Actionable Logic
574
+ if intent_data.get("actionable"):
 
 
 
 
 
 
 
575
 
576
+ # Handle Shopping List
577
+ if intent_data.get("items"):
578
+ analyst_result = calculate_basket_optimization(intent_data["items"])
 
579
 
580
+ if analyst_result.get("actionable"):
581
+ best = analyst_result["best_store"]
582
+
583
+ # Markdown Generation
584
+ md = f"# Shopping Plan\n\n"
585
+ md += f"**Recommended Store:** {best['retailer']}\n"
586
+ md += f"**Estimated Total:** ${best['total_price']:.2f}\n\n"
587
+
588
+ md += "## Your Basket\n\n"
589
+ md += "| Item | Found? |\n|---|---|\n"
590
+ for it in analyst_result["basket_items"]:
591
+ status = "✅ In Stock" if it in best["found_items"] else "❌ Not Found"
592
+ md += f"| {it} | {status} |\n"
593
+
594
+ if analyst_result["global_missing"]:
595
+ md += "\n### Missing Items (Estimate Required)\n"
596
+ for m in analyst_result["global_missing"]:
597
+ md += f"- {m}\n"
598
+
599
+ plan_data = {
600
+ "is_actionable": True,
601
+ "title": f"Plan: {best['retailer']} (${best['total_price']:.2f})",
602
+ "markdown_content": md,
603
+ "items": intent_data["items"],
604
+ "created_at": datetime.now(timezone.utc).isoformat()
605
+ }
606
+
607
+ # Persist Plan
608
+ if db:
609
+ doc_ref = db.collection("pricelyst_profiles").document(pid).collection("shopping_plans").document()
610
+ plan_data["id"] = doc_ref.id
611
+ doc_ref.set(plan_data)
612
+ logger.info(f"Log Call: Plan Saved {doc_ref.id}")
613
+
614
+ # 4. Log Call
615
  if db:
616
  db.collection("pricelyst_profiles").document(pid).collection("call_logs").add({
617
  "transcript": transcript,
618
+ "intent_data": intent_data,
619
+ "plan_generated": bool(plan_data),
620
  "ts": datetime.now(timezone.utc).isoformat()
621
  })
622
 
 
625
  "shopping_plan": plan_data if plan_data.get("is_actionable") else None
626
  })
627
 
628
+ # ––––– CRUD: Shopping Plans –––––
629
 
630
  @app.get("/api/shopping-plans")
631
  def list_plans():
 
636
  .order_by("created_at", direction=firestore.Query.DESCENDING).limit(10).stream()
637
  plans = [{"id": d.id, **d.to_dict()} for d in docs]
638
  return jsonify({"ok": True, "plans": plans})
639
+ except:
640
+ return jsonify({"ok": False}), 500
641
 
642
  @app.delete("/api/shopping-plans/<plan_id>")
643
  def delete_plan(plan_id):