rairo commited on
Commit
7901ca2
·
verified ·
1 Parent(s): f7908f8

Update utility.py

Browse files
Files changed (1) hide show
  1. utility.py +39 -109
utility.py CHANGED
@@ -124,7 +124,7 @@ except Exception as e:
124
 
125
  # --- START: VISION PROCESSING FUNCTIONS ---
126
 
127
- def _transpile_vision_json_to_query(vision_json: List[Dict], caption: Optional[str]) -> str:
128
  """Converts the structured JSON list from the Vision AI into a natural language query."""
129
  if not vision_json:
130
  return "Error: Could not extract any transactions from the image."
@@ -156,10 +156,6 @@ def _transpile_vision_json_to_query(vision_json: List[Dict], caption: Optional[s
156
  query_parts.append(part)
157
 
158
  final_query = " and ".join(query_parts)
159
-
160
- # --- REMOVED --- The caption is now handled inside the vision prompt, not appended here.
161
- # if caption:
162
- # final_query += f" {caption}"
163
 
164
  return final_query.strip()
165
 
@@ -171,7 +167,6 @@ def _analyze_image_with_vision(image_bytes: bytes, caption: Optional[str]) -> Li
171
  try:
172
  image_pil = Image.open(io.BytesIO(image_bytes))
173
 
174
- # --- MODIFIED --- Added caption handling directly into the prompt.
175
  prompt = f"""
176
  You are a bookkeeping vision model. Analyze the image (receipt, invoice, handwritten note, *catalog/menu/price list*, product photo, shelf photo). Return ONLY a valid JSON array [] of transaction objects that our TEXT PIPELINE can consume directly.
177
 
@@ -315,8 +310,7 @@ def process_image_and_generate_query(image_bytes: bytes, caption: Optional[str])
315
  return "Error: I couldn't find any actionable transactions in the image."
316
 
317
  logger.info(f"Vision AI analysis complete. Result: {vision_json_list}")
318
- # --- MODIFIED --- Caption is no longer passed here.
319
- return _transpile_vision_json_to_query(vision_json_list, None)
320
 
321
  # --- END: VISION PROCESSING FUNCTIONS ---
322
 
@@ -329,9 +323,7 @@ class ReportEngine:
329
  self.currency = self._get_user_currency()
330
 
331
  def _get_user_currency(self) -> str:
332
- """
333
- Determines the user's primary currency from their data.
334
- """
335
  for df_name in ['sales', 'expenses', 'assets', 'liabilities']:
336
  if df_name in self.dfs and 'currency' in self.dfs[df_name].columns:
337
  mode = self.dfs[df_name]['currency'].mode()
@@ -342,36 +334,31 @@ class ReportEngine:
342
  return "$"
343
 
344
  def _get_time_filter(self, target_df: pd.DataFrame) -> Optional[pd.Series]:
345
- """
346
- REWRITTEN: A robust, ordered temporal filter to correctly interpret user requests.
347
- """
348
  if target_df is None or 'timestamp' not in target_df.columns or target_df.empty:
349
  return None
350
 
351
  # The order of these checks is critical: from most specific to most general.
352
 
353
- # 1. Check for "yesterday"
354
  if "yesterday" in self.query:
355
  yesterday = (self.now - timedelta(days=1)).date()
356
  start_of_yesterday = pd.Timestamp(yesterday, tz='UTC')
357
  end_of_yesterday = start_of_yesterday + timedelta(days=1)
358
  return (target_df['timestamp'] >= start_of_yesterday) & (target_df['timestamp'] < end_of_yesterday)
359
 
360
- # 2. Check for "today"
361
  if "today" in self.query:
362
  today = self.now.date()
363
  start_of_today = pd.Timestamp(today, tz='UTC')
364
  end_of_today = start_of_today + timedelta(days=1)
365
  return (target_df['timestamp'] >= start_of_today) & (target_df['timestamp'] < end_of_today)
366
 
367
- # 3. Check for "last month"
368
  if "last month" in self.query:
369
  first_day_current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
370
  last_day_last_month = first_day_current_month - timedelta(days=1)
371
  first_day_last_month = last_day_last_month.replace(day=1)
372
- return (target_df['timestamp'] >= pd.Timestamp(first_day_last_month, tz='UTC')) & (target_df['timestamp'] < pd.Timestamp(first_day_current_month, tz='UTC'))
 
373
 
374
- # 4. Check for a specific month name (e.g., "in july")
375
  month_match = re.search(r"\b(january|february|march|april|may|june|july|august|september|october|november|december)\b", self.query)
376
  if month_match:
377
  month_name = month_match.group(1)
@@ -379,36 +366,30 @@ class ReportEngine:
379
  current_year = self.now.year
380
  return (target_df['timestamp'].dt.month == month_number) & (target_df['timestamp'].dt.year == current_year)
381
 
382
- # 5. Check for "this month" or a standalone "month"
383
  if "this month" in self.query or "month" in self.query:
384
  return (target_df['timestamp'].dt.month == self.now.month) & (target_df['timestamp'].dt.year == self.now.year)
385
 
386
- # 6. Check for "last week"
387
  if "last week" in self.query:
388
  start_of_this_week = self.now.date() - timedelta(days=self.now.weekday())
389
  start_of_last_week = start_of_this_week - timedelta(days=7)
390
  return (target_df['timestamp'].dt.date >= start_of_last_week) & (target_df['timestamp'].dt.date < start_of_this_week)
391
 
392
- # 7. Check for "this week" or a standalone "week"
393
  if "this week" in self.query or "week" in self.query:
394
  start_of_week = self.now.date() - timedelta(days=self.now.weekday())
395
  return target_df['timestamp'].dt.date >= start_of_week
396
 
397
- # 8. Check for a specific day of the week (e.g., "on monday")
398
  day_match = re.search(r"on (monday|tuesday|wednesday|thursday|friday|saturday|sunday)", self.query)
399
  if day_match and 'day_of_week' in target_df.columns:
400
  day_name = day_match.group(1).title()
401
  return target_df['day_of_week'] == day_name
402
 
403
- # 9. Check for "year"
404
  if "year" in self.query:
405
  return target_df['timestamp'].dt.year == self.now.year
406
 
407
- # Default: if no time filter is found, return all data
408
  return pd.Series(True, index=target_df.index)
409
 
410
  def generate_report(self) -> str:
411
- """RESTORED: Generates the original, simple Sales or Expenses report."""
412
  subject = "sales"
413
  if "expense" in self.query:
414
  subject = "expenses"
@@ -452,7 +433,7 @@ class ReportEngine:
452
  return json.dumps(self.results, indent=2)
453
 
454
  def generate_profit_report(self) -> str:
455
- """NEW: Generates a comprehensive profitability report."""
456
  sales_df = self.dfs.get('sales', pd.DataFrame())
457
  expenses_df = self.dfs.get('expenses', pd.DataFrame())
458
 
@@ -494,7 +475,7 @@ class ReportEngine:
494
  return json.dumps(self.results, indent=2)
495
 
496
  def generate_item_report(self, subject_item: str) -> str:
497
- """NEW: Generates a performance report for a specific item."""
498
  sales_df = self.dfs.get('sales', pd.DataFrame())
499
  if sales_df.empty: return json.dumps({"error": f"No sales data found for '{subject_item}'."})
500
 
@@ -521,7 +502,7 @@ class ReportEngine:
521
  return json.dumps(self.results, indent=2)
522
 
523
  def generate_day_of_week_report(self) -> str:
524
- """NEW: Generates a report analyzing sales by day of the week."""
525
  sales_df = self.dfs.get('sales', pd.DataFrame())
526
  if sales_df.empty or 'day_of_week' not in sales_df.columns: return json.dumps({"error": "No data available to analyze by day."})
527
  time_filter = self._get_time_filter(sales_df)
@@ -571,11 +552,8 @@ class ReportEngine:
571
  return json.dumps(self.results, indent=2)
572
 
573
  def generate_business_snapshot(self) -> Dict[str, Any]:
574
- """
575
- NEW: Creates a high-level summary of the entire business for contextual AI coaching.
576
- """
577
  snapshot = {}
578
- # Financial KPIs
579
  sales_df = self.dfs.get('sales', pd.DataFrame())
580
  expenses_df = self.dfs.get('expenses', pd.DataFrame())
581
  total_revenue = sales_df['sale_total'].sum() if not sales_df.empty else 0
@@ -588,7 +566,6 @@ class ReportEngine:
588
  "Net Profit": f"{self.currency}{net_profit:.2f}"
589
  }
590
 
591
- # Inventory Overview
592
  inventory_df = self.dfs.get('inventory', pd.DataFrame())
593
  if not inventory_df.empty and 'item' in inventory_df.columns and 'quantity' in inventory_df.columns:
594
  snapshot['inventory_overview'] = "\n".join(
@@ -597,7 +574,6 @@ class ReportEngine:
597
  else:
598
  snapshot['inventory_overview'] = "No inventory items recorded."
599
 
600
- # Asset Register
601
  assets_df = self.dfs.get('assets', pd.DataFrame())
602
  if not assets_df.empty and 'name' in assets_df.columns and 'value' in assets_df.columns:
603
  snapshot['asset_register'] = "\n".join(
@@ -606,7 +582,6 @@ class ReportEngine:
606
  else:
607
  snapshot['asset_register'] = "No assets recorded."
608
 
609
- # Liabilities Ledger
610
  liabilities_df = self.dfs.get('liabilities', pd.DataFrame())
611
  if not liabilities_df.empty and 'creditor' in liabilities_df.columns and 'amount' in liabilities_df.columns:
612
  snapshot['liabilities_ledger'] = "\n".join(
@@ -725,9 +700,7 @@ def add_timestamp(transaction: Dict) -> Dict:
725
  return transaction
726
 
727
  def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
728
- """
729
- Finds the canonical version of an item using an "exact match first" hybrid approach.
730
- """
731
  inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
732
  name_lower = item_name.lower().strip()
733
  all_item_docs = list(inventory_ref.stream())
@@ -789,9 +762,7 @@ def create_or_update_inventory_or_service_offering(user_phone: str, transaction_
789
  return False, f"An error occurred during inventory update: {e}"
790
 
791
  def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
792
- """
793
- Process sales with fuzzy name matching, user price override, and on-the-fly service creation.
794
- """
795
  feedback_messages = []
796
  any_success = False
797
  for t in transaction_data:
@@ -1012,21 +983,14 @@ def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFra
1012
  return all_dfs_with_names
1013
 
1014
  def _get_relative_date_context() -> str:
1015
- """
1016
- Generates a string of pre-calculated dates to inject into the PandasAI prompt
1017
- for improved temporal awareness.
1018
- """
1019
  today = datetime.now(timezone.utc)
1020
 
1021
- def fmt(d):
1022
- return d.strftime('%Y-%m-%d')
1023
 
1024
  yesterday = today - timedelta(days=1)
1025
  start_of_this_week = today - timedelta(days=today.weekday())
1026
- end_of_this_week = start_of_this_week + timedelta(days=6)
1027
  start_of_last_week = start_of_this_week - timedelta(days=7)
1028
- end_of_last_week = start_of_last_week + timedelta(days=6)
1029
-
1030
  last_monday = start_of_this_week - timedelta(days=7)
1031
 
1032
  context = [
@@ -1041,9 +1005,7 @@ def _get_relative_date_context() -> str:
1041
  return "\n".join(context)
1042
 
1043
  def read_datalake(user_phone: str, query: str) -> str:
1044
- """
1045
- Implements the final Unified Strategy for robust, intelligent data analysis.
1046
- """
1047
  def _to_text(resp) -> str:
1048
  try:
1049
  if resp is None: return ""
@@ -1065,13 +1027,11 @@ def read_datalake(user_phone: str, query: str) -> str:
1065
 
1066
  # --- REFACTORED ROUTING LOGIC ---
1067
 
1068
- # --- Tier 0: Simple Direct Lookups (Unchanged) ---
1069
  simple_lookup_map = {
1070
  "inventory": ["stock", "inventory", "in stock", "what do i have"],
1071
  "assets": ["asset", "assets", "my assets"],
1072
  "liabilities": ["liabilities", "i owe", "creditor", "my debts"],
1073
- "sales": ["show my sales", "list sales"],
1074
- "expenses": ["show my expenses", "list expenses"]
1075
  }
1076
  for df_name, keywords in simple_lookup_map.items():
1077
  if any(keyword in query_lower for keyword in keywords):
@@ -1081,7 +1041,7 @@ def read_datalake(user_phone: str, query: str) -> str:
1081
  return render_df_as_image(target_df_tuple[1])
1082
  return f"You don't have any {df_name} recorded yet."
1083
 
1084
- # --- Tier 1: Specific, Pre-canned Reports ---
1085
  item_report_match = re.search(r"(?:sales report for|report on|performance of)\s+([\w\s]+?)(?:\s+(?:this|last|on|in|for|today|yesterday)|$)", query_lower)
1086
  report_json = None
1087
 
@@ -1096,15 +1056,20 @@ def read_datalake(user_phone: str, query: str) -> str:
1096
  elif any(k in query_lower for k in ["best day", "busiest day", "sales by day"]):
1097
  logger.info(f"Handling '{query}' with the Day of Week Report Path.")
1098
  report_json = engine.generate_day_of_week_report()
 
 
 
 
1099
 
1100
  if report_json:
1101
  report_data = json.loads(report_json)
1102
  if "error" in report_data: return report_data["error"]
1103
  synthesis_prompt = f"""
1104
  Directly synthesize a professional business report from the following JSON data. Omit conversational introductions or summaries. Present only the data-driven report, formatted for WhatsApp (*bold*, _italic_, emojis).
1105
-
 
1106
  **IMPORTANT INSTRUCTIONS:**
1107
- - If `report_subject` is "Profitability", present a clear financial summary: start with Revenue, subtract COGS for Gross Profit, then subtract Expenses for Net Profit. Also mention other KPIs.
1108
  - If `report_subject` is "Item Report", state the item name and present its performance KPIs.
1109
  - If `report_subject` is "Day of Week Analysis", state the best day and list daily sales.
1110
 
@@ -1114,29 +1079,8 @@ def read_datalake(user_phone: str, query: str) -> str:
1114
  response = llm.invoke(synthesis_prompt)
1115
  return _to_text(response)
1116
 
1117
- # --- Tier 1.5: General Temporal Reports ---
1118
- subjects = ["sales", "expenses"]
1119
- # --- MODIFIED --- Expanded temporals list for better routing
1120
- temporals = [
1121
- "today", "yesterday", "week", "month", "year", "monday", "tuesday",
1122
- "wednesday", "thursday", "friday", "saturday", "sunday", "january",
1123
- "february", "march", "april", "may", "june", "july", "august",
1124
- "september", "october", "november", "december"
1125
- ]
1126
- if any(sub in query_lower for sub in subjects) and any(temp in query_lower for temp in temporals):
1127
- logger.info(f"Handling '{query}' with the General Temporal Report Path.")
1128
- report_json = engine.generate_report()
1129
- report_data = json.loads(report_json)
1130
- if "error" in report_data: return report_data["error"]
1131
- synthesis_prompt = f"""Synthesize a professional business report from the following JSON data. Omit conversational introductions or summaries. For sales reports, you MUST provide a creative and actionable "Insight" section at the end based on the best/worst selling items. Present only the data-driven report and the insight, formatted for WhatsApp (*bold*, _italic_, emojis).
1132
- Data: {report_json}"""
1133
- response = llm.invoke(synthesis_prompt)
1134
- return _to_text(response)
1135
-
1136
- # --- Tier 2: Predictive & Generic Summary Fallback ---
1137
  predictive_keywords = ["expect", "forecast", "predict"]
1138
- historical_report_keywords = ["sales report", "expense report", "performance summary", "how did i do", "overview"]
1139
-
1140
  if any(keyword in query_lower for keyword in predictive_keywords):
1141
  logger.info(f"Handling '{query}' with the Forecasting Path.")
1142
  forecast_json = engine.generate_forecast_data()
@@ -1145,16 +1089,6 @@ def read_datalake(user_phone: str, query: str) -> str:
1145
  synthesis_prompt = f"Synthesize a sales forecast from the following JSON data. Omit conversational introductions or summaries. Present only the forecast. Data: {forecast_json}"
1146
  response = llm.invoke(synthesis_prompt)
1147
  return _to_text(response)
1148
-
1149
- elif any(keyword in query_lower for keyword in historical_report_keywords):
1150
- logger.info(f"Handling '{query}' with the General Reporting Path (Sales/Expense).")
1151
- report_json = engine.generate_report()
1152
- report_data = json.loads(report_json)
1153
- if "error" in report_data: return report_data["error"]
1154
- synthesis_prompt = f"""Synthesize a professional business report from the following JSON data. Omit conversational introductions or summaries. For sales reports, you MUST provide a creative and actionable "Insight" section at the end based on the best/worst selling items. Present only the data-driven report and the insight, formatted for WhatsApp (*bold*, _italic_, emojis).
1155
- Data: {report_json}"""
1156
- response = llm.invoke(synthesis_prompt)
1157
- return _to_text(response)
1158
 
1159
  # --- Tier 3: Business Coach & Help Layer ---
1160
  help_keywords = ['help', 'tutorial', 'guide', 'how do you work', 'what can you do', 'how can', 'how would']
@@ -1165,16 +1099,15 @@ def read_datalake(user_phone: str, query: str) -> str:
1165
  snapshot_str = json.dumps(snapshot, indent=2)
1166
 
1167
  synthesis_prompt = f"""
1168
- You are Qx, a friendly and insightful business coach and financial expert. The user is asking a general question. Only perform any calculations when necessary. Your task is to provide a clear, helpful, and strategic answer based on their question, using your general knowledge and the business snapshot provided below for context.
1169
 
1170
  **IMPORTANT RULES:**
1171
- 1. **Use the Context:** Use the Business Snapshot as your internal knowledge to make your advice relevant and personalized. For example, if inventory is high for an item, you might suggest a promotion. If profit is low, you might suggest cost-cutting measures.
1172
- 2. **You can state the numbers, info or metrics from the data where helpful. Synthesize them into your advice. generate insight from the data if needed.
1173
- 3. **Stay in Character:** Act as a coach. Be encouraging and provide actionable advice.
1174
- 4. **Handle 'Help' Queries:** If asked about your capabilities, explain that you can record transactions (sales, expenses, etc.) via text or images, generate detailed reports (profit, sales by item), answer questions about their data, and provide business advice.
1175
- 5. **Format for WhatsApp:** Use *bold*, _italic_, and emojis to make your response clear and engaging.
1176
 
1177
- **BUSINESS SNAPSHOT (INTERNAL CONTEXT ONLY):**
1178
  {snapshot_str}
1179
 
1180
  **User's Question:**
@@ -1231,16 +1164,15 @@ def read_datalake(user_phone: str, query: str) -> str:
1231
  snapshot_str = json.dumps(snapshot, indent=2)
1232
 
1233
  synthesis_prompt = f"""
1234
- You are Qx, a friendly and insightful business coach and financial expert. The user is asking a general question. Only perform calculations when necessary. Your task is to provide a clear, helpful, and strategic answer based on their question, using your general knowledge and the business snapshot provided below for context.
1235
 
1236
  **IMPORTANT RULES:**
1237
- 1. **Use the Context:** Use the Business Snapshot as your internal knowledge to make your advice relevant and personalized. For example, if inventory is high for an item, you might suggest a promotion. If profit is low, you might suggest cost-cutting measures.
1238
- 2. **You can state the numbers, info or metrics from the data where helpful. Synthesize them into your advice. generate insight from data if needed.
1239
- 3. **Stay in Character:** Act as a coach. Be encouraging and provide actionable advice.
1240
- 4. **Handle 'Help' Queries:** If asked about your capabilities, explain that you can record transactions (sales, expenses, etc.) via text or images, generate detailed reports (profit, sales by item), answer questions about their data, and provide business advice.
1241
- 5. **Format for WhatsApp:** Use *bold*, _italic_, and emojis to make your response clear and engaging.
1242
 
1243
- **BUSINESS SNAPSHOT (INTERNAL CONTEXT ONLY):**
1244
  {snapshot_str}
1245
 
1246
  **User's Question:**
@@ -1396,9 +1328,7 @@ def fetch_transaction(user_phone: str, identifier: str, collection: str = "inven
1396
  return None
1397
 
1398
  def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
1399
- """
1400
- Groups transactions by intent and type, processes each group, and returns a consolidated feedback report.
1401
- """
1402
  if not parsed_trans_data:
1403
  return "I couldn't understand the transaction details. Could you please try again?"
1404
  grouped_transactions = {}
 
124
 
125
  # --- START: VISION PROCESSING FUNCTIONS ---
126
 
127
+ def _transpile_vision_json_to_query(vision_json: List[Dict]) -> str:
128
  """Converts the structured JSON list from the Vision AI into a natural language query."""
129
  if not vision_json:
130
  return "Error: Could not extract any transactions from the image."
 
156
  query_parts.append(part)
157
 
158
  final_query = " and ".join(query_parts)
 
 
 
 
159
 
160
  return final_query.strip()
161
 
 
167
  try:
168
  image_pil = Image.open(io.BytesIO(image_bytes))
169
 
 
170
  prompt = f"""
171
  You are a bookkeeping vision model. Analyze the image (receipt, invoice, handwritten note, *catalog/menu/price list*, product photo, shelf photo). Return ONLY a valid JSON array [] of transaction objects that our TEXT PIPELINE can consume directly.
172
 
 
310
  return "Error: I couldn't find any actionable transactions in the image."
311
 
312
  logger.info(f"Vision AI analysis complete. Result: {vision_json_list}")
313
+ return _transpile_vision_json_to_query(vision_json_list)
 
314
 
315
  # --- END: VISION PROCESSING FUNCTIONS ---
316
 
 
323
  self.currency = self._get_user_currency()
324
 
325
  def _get_user_currency(self) -> str:
326
+ """Determines the user's primary currency from their data."""
 
 
327
  for df_name in ['sales', 'expenses', 'assets', 'liabilities']:
328
  if df_name in self.dfs and 'currency' in self.dfs[df_name].columns:
329
  mode = self.dfs[df_name]['currency'].mode()
 
334
  return "$"
335
 
336
  def _get_time_filter(self, target_df: pd.DataFrame) -> Optional[pd.Series]:
337
+ """A robust, ordered temporal filter to correctly interpret user requests."""
 
 
338
  if target_df is None or 'timestamp' not in target_df.columns or target_df.empty:
339
  return None
340
 
341
  # The order of these checks is critical: from most specific to most general.
342
 
 
343
  if "yesterday" in self.query:
344
  yesterday = (self.now - timedelta(days=1)).date()
345
  start_of_yesterday = pd.Timestamp(yesterday, tz='UTC')
346
  end_of_yesterday = start_of_yesterday + timedelta(days=1)
347
  return (target_df['timestamp'] >= start_of_yesterday) & (target_df['timestamp'] < end_of_yesterday)
348
 
 
349
  if "today" in self.query:
350
  today = self.now.date()
351
  start_of_today = pd.Timestamp(today, tz='UTC')
352
  end_of_today = start_of_today + timedelta(days=1)
353
  return (target_df['timestamp'] >= start_of_today) & (target_df['timestamp'] < end_of_today)
354
 
 
355
  if "last month" in self.query:
356
  first_day_current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
357
  last_day_last_month = first_day_current_month - timedelta(days=1)
358
  first_day_last_month = last_day_last_month.replace(day=1)
359
+ # --- FIX --- Removed redundant tz parameter to prevent ValueError
360
+ return (target_df['timestamp'] >= pd.Timestamp(first_day_last_month)) & (target_df['timestamp'] < pd.Timestamp(first_day_current_month))
361
 
 
362
  month_match = re.search(r"\b(january|february|march|april|may|june|july|august|september|october|november|december)\b", self.query)
363
  if month_match:
364
  month_name = month_match.group(1)
 
366
  current_year = self.now.year
367
  return (target_df['timestamp'].dt.month == month_number) & (target_df['timestamp'].dt.year == current_year)
368
 
 
369
  if "this month" in self.query or "month" in self.query:
370
  return (target_df['timestamp'].dt.month == self.now.month) & (target_df['timestamp'].dt.year == self.now.year)
371
 
 
372
  if "last week" in self.query:
373
  start_of_this_week = self.now.date() - timedelta(days=self.now.weekday())
374
  start_of_last_week = start_of_this_week - timedelta(days=7)
375
  return (target_df['timestamp'].dt.date >= start_of_last_week) & (target_df['timestamp'].dt.date < start_of_this_week)
376
 
 
377
  if "this week" in self.query or "week" in self.query:
378
  start_of_week = self.now.date() - timedelta(days=self.now.weekday())
379
  return target_df['timestamp'].dt.date >= start_of_week
380
 
 
381
  day_match = re.search(r"on (monday|tuesday|wednesday|thursday|friday|saturday|sunday)", self.query)
382
  if day_match and 'day_of_week' in target_df.columns:
383
  day_name = day_match.group(1).title()
384
  return target_df['day_of_week'] == day_name
385
 
 
386
  if "year" in self.query:
387
  return target_df['timestamp'].dt.year == self.now.year
388
 
 
389
  return pd.Series(True, index=target_df.index)
390
 
391
  def generate_report(self) -> str:
392
+ """Generates a simple Sales or Expenses report."""
393
  subject = "sales"
394
  if "expense" in self.query:
395
  subject = "expenses"
 
433
  return json.dumps(self.results, indent=2)
434
 
435
  def generate_profit_report(self) -> str:
436
+ """Generates a comprehensive profitability report."""
437
  sales_df = self.dfs.get('sales', pd.DataFrame())
438
  expenses_df = self.dfs.get('expenses', pd.DataFrame())
439
 
 
475
  return json.dumps(self.results, indent=2)
476
 
477
  def generate_item_report(self, subject_item: str) -> str:
478
+ """Generates a performance report for a specific item."""
479
  sales_df = self.dfs.get('sales', pd.DataFrame())
480
  if sales_df.empty: return json.dumps({"error": f"No sales data found for '{subject_item}'."})
481
 
 
502
  return json.dumps(self.results, indent=2)
503
 
504
  def generate_day_of_week_report(self) -> str:
505
+ """Generates a report analyzing sales by day of the week."""
506
  sales_df = self.dfs.get('sales', pd.DataFrame())
507
  if sales_df.empty or 'day_of_week' not in sales_df.columns: return json.dumps({"error": "No data available to analyze by day."})
508
  time_filter = self._get_time_filter(sales_df)
 
552
  return json.dumps(self.results, indent=2)
553
 
554
  def generate_business_snapshot(self) -> Dict[str, Any]:
555
+ """Creates a high-level summary of the entire business for contextual AI coaching."""
 
 
556
  snapshot = {}
 
557
  sales_df = self.dfs.get('sales', pd.DataFrame())
558
  expenses_df = self.dfs.get('expenses', pd.DataFrame())
559
  total_revenue = sales_df['sale_total'].sum() if not sales_df.empty else 0
 
566
  "Net Profit": f"{self.currency}{net_profit:.2f}"
567
  }
568
 
 
569
  inventory_df = self.dfs.get('inventory', pd.DataFrame())
570
  if not inventory_df.empty and 'item' in inventory_df.columns and 'quantity' in inventory_df.columns:
571
  snapshot['inventory_overview'] = "\n".join(
 
574
  else:
575
  snapshot['inventory_overview'] = "No inventory items recorded."
576
 
 
577
  assets_df = self.dfs.get('assets', pd.DataFrame())
578
  if not assets_df.empty and 'name' in assets_df.columns and 'value' in assets_df.columns:
579
  snapshot['asset_register'] = "\n".join(
 
582
  else:
583
  snapshot['asset_register'] = "No assets recorded."
584
 
 
585
  liabilities_df = self.dfs.get('liabilities', pd.DataFrame())
586
  if not liabilities_df.empty and 'creditor' in liabilities_df.columns and 'amount' in liabilities_df.columns:
587
  snapshot['liabilities_ledger'] = "\n".join(
 
700
  return transaction
701
 
702
  def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
703
+ """Finds the canonical version of an item using an "exact match first" hybrid approach."""
 
 
704
  inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
705
  name_lower = item_name.lower().strip()
706
  all_item_docs = list(inventory_ref.stream())
 
762
  return False, f"An error occurred during inventory update: {e}"
763
 
764
  def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
765
+ """Process sales with fuzzy name matching, user price override, and on-the-fly service creation."""
 
 
766
  feedback_messages = []
767
  any_success = False
768
  for t in transaction_data:
 
983
  return all_dfs_with_names
984
 
985
  def _get_relative_date_context() -> str:
986
+ """Generates a string of pre-calculated dates for improved temporal awareness."""
 
 
 
987
  today = datetime.now(timezone.utc)
988
 
989
+ def fmt(d): return d.strftime('%Y-%m-%d')
 
990
 
991
  yesterday = today - timedelta(days=1)
992
  start_of_this_week = today - timedelta(days=today.weekday())
 
993
  start_of_last_week = start_of_this_week - timedelta(days=7)
 
 
994
  last_monday = start_of_this_week - timedelta(days=7)
995
 
996
  context = [
 
1005
  return "\n".join(context)
1006
 
1007
  def read_datalake(user_phone: str, query: str) -> str:
1008
+ """Implements the final Unified Strategy for robust, intelligent data analysis."""
 
 
1009
  def _to_text(resp) -> str:
1010
  try:
1011
  if resp is None: return ""
 
1027
 
1028
  # --- REFACTORED ROUTING LOGIC ---
1029
 
1030
+ # --- Tier 0: Simple Direct Lookups ---
1031
  simple_lookup_map = {
1032
  "inventory": ["stock", "inventory", "in stock", "what do i have"],
1033
  "assets": ["asset", "assets", "my assets"],
1034
  "liabilities": ["liabilities", "i owe", "creditor", "my debts"],
 
 
1035
  }
1036
  for df_name, keywords in simple_lookup_map.items():
1037
  if any(keyword in query_lower for keyword in keywords):
 
1041
  return render_df_as_image(target_df_tuple[1])
1042
  return f"You don't have any {df_name} recorded yet."
1043
 
1044
+ # --- Tier 1: Canned & Temporal Reports (NEW UNIFIED LOGIC) ---
1045
  item_report_match = re.search(r"(?:sales report for|report on|performance of)\s+([\w\s]+?)(?:\s+(?:this|last|on|in|for|today|yesterday)|$)", query_lower)
1046
  report_json = None
1047
 
 
1056
  elif any(k in query_lower for k in ["best day", "busiest day", "sales by day"]):
1057
  logger.info(f"Handling '{query}' with the Day of Week Report Path.")
1058
  report_json = engine.generate_day_of_week_report()
1059
+ # --- FIX --- This new, simple route handles all sales/expense queries directly.
1060
+ elif "sales" in query_lower or "expense" in query_lower:
1061
+ logger.info(f"Handling '{query}' with the General Sales/Expense Report Path.")
1062
+ report_json = engine.generate_report()
1063
 
1064
  if report_json:
1065
  report_data = json.loads(report_json)
1066
  if "error" in report_data: return report_data["error"]
1067
  synthesis_prompt = f"""
1068
  Directly synthesize a professional business report from the following JSON data. Omit conversational introductions or summaries. Present only the data-driven report, formatted for WhatsApp (*bold*, _italic_, emojis).
1069
+ For sales reports, if helpful, provide a creative and actionable "Insight" section at the end based on the best/worst selling items.
1070
+
1071
  **IMPORTANT INSTRUCTIONS:**
1072
+ - If `report_subject` is "Profitability", present a clear financial summary.
1073
  - If `report_subject` is "Item Report", state the item name and present its performance KPIs.
1074
  - If `report_subject` is "Day of Week Analysis", state the best day and list daily sales.
1075
 
 
1079
  response = llm.invoke(synthesis_prompt)
1080
  return _to_text(response)
1081
 
1082
+ # --- Tier 2: Predictive Queries ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1083
  predictive_keywords = ["expect", "forecast", "predict"]
 
 
1084
  if any(keyword in query_lower for keyword in predictive_keywords):
1085
  logger.info(f"Handling '{query}' with the Forecasting Path.")
1086
  forecast_json = engine.generate_forecast_data()
 
1089
  synthesis_prompt = f"Synthesize a sales forecast from the following JSON data. Omit conversational introductions or summaries. Present only the forecast. Data: {forecast_json}"
1090
  response = llm.invoke(synthesis_prompt)
1091
  return _to_text(response)
 
 
 
 
 
 
 
 
 
 
1092
 
1093
  # --- Tier 3: Business Coach & Help Layer ---
1094
  help_keywords = ['help', 'tutorial', 'guide', 'how do you work', 'what can you do', 'how can', 'how would']
 
1099
  snapshot_str = json.dumps(snapshot, indent=2)
1100
 
1101
  synthesis_prompt = f"""
1102
+ You are Qx, a friendly and insightful business coach and financial expert. Your task is to provide a clear, helpful, and strategic answer based on the user's question, using your general business knowledge combined with the business snapshot provided below for context.
1103
 
1104
  **IMPORTANT RULES:**
1105
+ 1. **Synthesize, Don't Just Report:** Use the Business Snapshot to make your advice relevant and personalized. For example, if inventory is high for an item, you might suggest a promotion. If profit is low, you might suggest cost-cutting measures.
1106
+ 2. **Act as a Coach:** Be encouraging and provide actionable advice.
1107
+ 3. **Handle 'Help' Queries:** If asked about your capabilities, explain that you can record transactions (sales, expenses, etc.) via text or images, generate detailed reports (profit, sales by item), answer questions about their data, and provide business advice.
1108
+ 4. **Format for WhatsApp:** Use *bold*, _italic_, and emojis to make your response clear and engaging.
 
1109
 
1110
+ **Business Snapshot for Context:**
1111
  {snapshot_str}
1112
 
1113
  **User's Question:**
 
1164
  snapshot_str = json.dumps(snapshot, indent=2)
1165
 
1166
  synthesis_prompt = f"""
1167
+ You are Qx, a friendly and insightful business coach and financial expert. Your task is to provide a clear, helpful, and strategic answer based on the user's question, using your general business knowledge combined with the business snapshot provided below for context.
1168
 
1169
  **IMPORTANT RULES:**
1170
+ 1. **Synthesize, Don't Just Report:** Use the Business Snapshot to make your advice relevant and personalized. For example, if inventory is high for an item, you might suggest a promotion. If profit is low, you might suggest cost-cutting measures.
1171
+ 2. **Act as a Coach:** Be encouraging and provide actionable advice.
1172
+ 3. **Handle 'Help' Queries:** If asked about your capabilities, explain that you can record transactions (sales, expenses, etc.) via text or images, generate detailed reports (profit, sales by item), answer questions about their data, and provide business advice.
1173
+ 4. **Format for WhatsApp:** Use *bold*, _italic_, and emojis to make your response clear and engaging.
 
1174
 
1175
+ **Business Snapshot for Context:**
1176
  {snapshot_str}
1177
 
1178
  **User's Question:**
 
1328
  return None
1329
 
1330
  def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
1331
+ """Groups transactions by intent and type, processes each group, and returns a consolidated feedback report."""
 
 
1332
  if not parsed_trans_data:
1333
  return "I couldn't understand the transaction details. Could you please try again?"
1334
  grouped_transactions = {}