rairo commited on
Commit
0bffbcb
·
verified ·
1 Parent(s): f357ad5

Update utility.py

Browse files
Files changed (1) hide show
  1. utility.py +35 -88
utility.py CHANGED
@@ -6,10 +6,11 @@ from datetime import datetime, timezone
6
  from typing import List, Dict, Union, Optional, Any
7
  from google.cloud import firestore
8
  import pandas as pd
9
- import inflect # <-- ADDED for robust pluralization
 
10
  from pandasai import SmartDatalake
11
  from pandasai.responses.response_parser import ResponseParser
12
- from pandasai.exceptions import NoCodeFoundError # <-- ADDED for specific error handling
13
  from langchain_google_genai import ChatGoogleGenerativeAI
14
  import google.generativeai as genai
15
  import re
@@ -128,51 +129,6 @@ Each transaction object MUST have the following keys:
128
  - **Rule for Queries:** For "read" intents or general questions, set `transaction_type` to "query" and the `details` object MUST contain a single key `"query"` with the user's full, original question as the value.
129
  - **Rule for Multiple Items:** If the user's request contains multiple distinct transactions (e.g., recording an expense AND an asset), create a separate JSON object for each one within the main list.
130
  - **Rule for Expense Normalization:** For "create" intents with `transaction_type` "expense", analyze the `description`. If it contains common keywords, normalize it to a single word. For example, if the description is "paid for fuel for the delivery truck", the normalized `description` in the JSON should be "fuel". If it's "office electricity bill", normalize it to "electricity".
131
-
132
- **5. Examples:**
133
-
134
- **Example 1: Simple Query**
135
- - **Input:** "what are my assets?"
136
- - **Output:**
137
- [
138
- {
139
- "intent": "read",
140
- "transaction_type": "query",
141
- "details": {
142
- "query": "what are my assets?"
143
- }
144
- }
145
- ]
146
-
147
- **Example 2: Creating a Normalized Expense**
148
- - **Input:** "I paid R250 for fuel for work"
149
- - **Output:**
150
- [
151
- {
152
- "intent": "create",
153
- "transaction_type": "expense",
154
- "details": {
155
- "description": "fuel",
156
- "amount": 250,
157
- "currency": "R"
158
- }
159
- }
160
- ]
161
-
162
- **Example 3: Creating an Asset**
163
- - **Input:** "just bought a new company laptop for $1500"
164
- - **Output:**
165
- [
166
- {
167
- "intent": "create",
168
- "transaction_type": "asset",
169
- "details": {
170
- "name": "new company laptop",
171
- "value": 1500,
172
- "currency": "$"
173
- }
174
- }
175
- ]
176
  """
177
  try:
178
  full_prompt = [system_prompt, prompt]
@@ -213,39 +169,36 @@ def add_timestamp(transaction: Dict) -> Dict:
213
 
214
  def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
215
  """
216
- Finds the canonical version of an item, handling plurals robustly.
 
217
  """
 
218
  inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
219
  name_lower = item_name.lower().strip()
220
 
221
- # --- CHANGE 3: Use inflect for robust singularization ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
  singular = p.singular_noun(name_lower)
223
- if not singular: # If inflect returns False (e.g., for already singular words)
224
  singular = name_lower
225
 
226
- plural = p.plural(singular)
227
- # --- END OF CHANGE 3 ---
228
-
229
- doc_singular = inventory_ref.document(singular).get()
230
- doc_plural = inventory_ref.document(plural).get()
231
-
232
- exists_singular = doc_singular.exists
233
- exists_plural = doc_plural.exists
234
-
235
- if exists_singular and exists_plural:
236
- data_s = doc_singular.to_dict()
237
- data_p = doc_plural.to_dict()
238
- time_s = data_s.get('last_updated', '')
239
- time_p = data_p.get('last_updated', '')
240
- if time_p > time_s:
241
- return {'doc': doc_plural, 'name': plural}
242
- return {'doc': doc_singular, 'name': singular}
243
- elif exists_singular:
244
- return {'doc': doc_singular, 'name': singular}
245
- elif exists_plural:
246
- return {'doc': doc_plural, 'name': plural}
247
- else:
248
- return {'doc': None, 'name': singular}
249
 
250
 
251
  def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
@@ -300,7 +253,7 @@ def create_or_update_inventory_or_service_offering(user_phone: str, transaction_
300
 
301
  def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
302
  """
303
- Process sales with user price override, name normalization, and service bypass.
304
  """
305
  feedback_messages = []
306
  any_success = False
@@ -332,7 +285,6 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
332
 
333
  @firestore.transactional
334
  def process_one_sale(transaction, sale_details):
335
- # --- CHANGE 2: Implement Price Override Logic ---
336
  user_price = sale_details.get('price') or sale_details.get('unit_price')
337
 
338
  if user_price is not None:
@@ -343,7 +295,6 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
343
  logger.info(f"Using last known price for '{canonical_name}': {selling_price}")
344
  else:
345
  return f"Sale failed for new item '{canonical_name}': You must specify a price for the first sale."
346
- # --- END OF CHANGE 2 ---
347
 
348
  if not isinstance(selling_price, (int, float)): selling_price = 0
349
 
@@ -518,10 +469,12 @@ def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
518
  if df.empty:
519
  return df
520
 
521
- # 1. Validate and convert timestamp columns
 
522
  for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
523
  if col in df.columns:
524
- df[col] = pd.to_datetime(df[col], errors='coerce')
 
525
 
526
  # 2. Validate and convert numeric columns
527
  numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available']
@@ -534,13 +487,13 @@ def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
534
  df[col] = df[col].fillna('Unknown')
535
 
536
  return df
 
537
 
538
 
539
  def _fetch_all_collections_as_dfs(user_phone: str) -> List[pd.DataFrame]:
540
  """
541
  Fetches all user data, splits inventory/services, validates, and returns DataFrames.
542
  """
543
- # --- CHANGE 4 & 5: Split inventory/services and validate DataFrames ---
544
  collections = ['sales', 'expenses', 'assets', 'liabilities']
545
  all_dfs = []
546
 
@@ -560,12 +513,12 @@ def _fetch_all_collections_as_dfs(user_phone: str) -> List[pd.DataFrame]:
560
 
561
  if inventory_data:
562
  inventory_df = pd.DataFrame(inventory_data)
563
- inventory_df.name = "inventory" # Name the dataframe for PandasAI
564
  all_dfs.append(_validate_dataframe(inventory_df))
565
 
566
  if services_data:
567
  services_df = pd.DataFrame(services_data)
568
- services_df.name = "services" # Name the dataframe for PandasAI
569
  all_dfs.append(_validate_dataframe(services_df))
570
 
571
  # Handle other collections
@@ -583,18 +536,16 @@ def _fetch_all_collections_as_dfs(user_phone: str) -> List[pd.DataFrame]:
583
 
584
  if data:
585
  df = pd.DataFrame(data)
586
- df.name = coll_name # Name the dataframe for PandasAI
587
  all_dfs.append(_validate_dataframe(df))
588
 
589
  return all_dfs
590
- # --- END OF CHANGE 4 & 5 ---
591
 
592
 
593
  def read_datalake(user_phone: str, query: str) -> str:
594
  """
595
  Handles queries with temporal awareness, robust error handling, and recall logic.
596
  """
597
- # --- CHANGE 1 & 6: Temporal Awareness and Advanced Error Handling ---
598
  try:
599
  all_dfs = _fetch_all_collections_as_dfs(user_phone)
600
  if not all_dfs:
@@ -605,7 +556,6 @@ def read_datalake(user_phone: str, query: str) -> str:
605
  "save_charts_path": user_defined_path, "enable_cache": False,
606
  })
607
 
608
- # 1. Add temporal context to the query
609
  today_str = datetime.now(timezone.utc).strftime('%Y-%m-%d')
610
  contextual_query = (
611
  f"For context, today's date is {today_str}. "
@@ -614,12 +564,10 @@ def read_datalake(user_phone: str, query: str) -> str:
614
  logger.info(f"Contextual query for PandasAI: {contextual_query}")
615
 
616
  try:
617
- # First attempt
618
  response = lake.chat(contextual_query)
619
  return str(response)
620
  except NoCodeFoundError:
621
  logger.warning(f"PandasAI failed on first attempt (NoCodeFoundError) for query: '{query}'. Retrying with simplification.")
622
- # 6. Recall with a simplified prompt
623
  simplified_query = (
624
  f"The previous attempt to answer the user's query failed. "
625
  f"Try again with a simpler approach. Instead of complex analysis, "
@@ -636,7 +584,6 @@ def read_datalake(user_phone: str, query: str) -> str:
636
  except Exception as e:
637
  logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
638
  return "Sorry, I encountered an error while analyzing your data. There might be an issue with the records. Please check your recent transactions."
639
- # --- END OF CHANGE 1 & 6 ---
640
 
641
 
642
  def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
 
6
  from typing import List, Dict, Union, Optional, Any
7
  from google.cloud import firestore
8
  import pandas as pd
9
+ import inflect # For robust pluralization
10
+ from thefuzz import process as fuzzy_process # For fuzzy string matching
11
  from pandasai import SmartDatalake
12
  from pandasai.responses.response_parser import ResponseParser
13
+ from pandasai.exceptions import NoCodeFoundError # For specific error handling
14
  from langchain_google_genai import ChatGoogleGenerativeAI
15
  import google.generativeai as genai
16
  import re
 
129
  - **Rule for Queries:** For "read" intents or general questions, set `transaction_type` to "query" and the `details` object MUST contain a single key `"query"` with the user's full, original question as the value.
130
  - **Rule for Multiple Items:** If the user's request contains multiple distinct transactions (e.g., recording an expense AND an asset), create a separate JSON object for each one within the main list.
131
  - **Rule for Expense Normalization:** For "create" intents with `transaction_type` "expense", analyze the `description`. If it contains common keywords, normalize it to a single word. For example, if the description is "paid for fuel for the delivery truck", the normalized `description` in the JSON should be "fuel". If it's "office electricity bill", normalize it to "electricity".
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  """
133
  try:
134
  full_prompt = [system_prompt, prompt]
 
169
 
170
  def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
171
  """
172
+ Finds the canonical version of an item using fuzzy matching for existing items
173
+ and inflect for new ones.
174
  """
175
+ # --- CHANGE 1: Fuzzy Search and Robust Pluralization ---
176
  inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
177
  name_lower = item_name.lower().strip()
178
 
179
+ # 1. Fetch all existing item names for fuzzy matching
180
+ all_item_docs = list(inventory_ref.stream())
181
+ all_item_names = [doc.id for doc in all_item_docs]
182
+
183
+ if all_item_names:
184
+ # 2. Find the best match using fuzzy logic
185
+ best_match = fuzzy_process.extractOne(name_lower, all_item_names)
186
+
187
+ # 3. Apply a strict threshold
188
+ if best_match and best_match[1] >= 90:
189
+ matched_name = best_match[0]
190
+ # Find the corresponding document
191
+ for doc in all_item_docs:
192
+ if doc.id == matched_name:
193
+ return {'doc': doc, 'name': matched_name}
194
+
195
+ # 4. If no good match is found, create a clean singular name for a new item
196
  singular = p.singular_noun(name_lower)
197
+ if not singular:
198
  singular = name_lower
199
 
200
+ return {'doc': None, 'name': singular}
201
+ # --- END OF CHANGE 1 ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
 
203
 
204
  def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
 
253
 
254
  def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
255
  """
256
+ Process sales with fuzzy name matching, user price override, and service bypass.
257
  """
258
  feedback_messages = []
259
  any_success = False
 
285
 
286
  @firestore.transactional
287
  def process_one_sale(transaction, sale_details):
 
288
  user_price = sale_details.get('price') or sale_details.get('unit_price')
289
 
290
  if user_price is not None:
 
295
  logger.info(f"Using last known price for '{canonical_name}': {selling_price}")
296
  else:
297
  return f"Sale failed for new item '{canonical_name}': You must specify a price for the first sale."
 
298
 
299
  if not isinstance(selling_price, (int, float)): selling_price = 0
300
 
 
469
  if df.empty:
470
  return df
471
 
472
+ # --- CHANGE 2: Robust Data Validation ---
473
+ # 1. Validate and convert timestamp columns to a consistent UTC format
474
  for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
475
  if col in df.columns:
476
+ # The key fix: utc=True handles mixed timezone-aware/naive data
477
+ df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
478
 
479
  # 2. Validate and convert numeric columns
480
  numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available']
 
487
  df[col] = df[col].fillna('Unknown')
488
 
489
  return df
490
+ # --- END OF CHANGE 2 ---
491
 
492
 
493
  def _fetch_all_collections_as_dfs(user_phone: str) -> List[pd.DataFrame]:
494
  """
495
  Fetches all user data, splits inventory/services, validates, and returns DataFrames.
496
  """
 
497
  collections = ['sales', 'expenses', 'assets', 'liabilities']
498
  all_dfs = []
499
 
 
513
 
514
  if inventory_data:
515
  inventory_df = pd.DataFrame(inventory_data)
516
+ inventory_df.name = "inventory"
517
  all_dfs.append(_validate_dataframe(inventory_df))
518
 
519
  if services_data:
520
  services_df = pd.DataFrame(services_data)
521
+ services_df.name = "services"
522
  all_dfs.append(_validate_dataframe(services_df))
523
 
524
  # Handle other collections
 
536
 
537
  if data:
538
  df = pd.DataFrame(data)
539
+ df.name = coll_name
540
  all_dfs.append(_validate_dataframe(df))
541
 
542
  return all_dfs
 
543
 
544
 
545
  def read_datalake(user_phone: str, query: str) -> str:
546
  """
547
  Handles queries with temporal awareness, robust error handling, and recall logic.
548
  """
 
549
  try:
550
  all_dfs = _fetch_all_collections_as_dfs(user_phone)
551
  if not all_dfs:
 
556
  "save_charts_path": user_defined_path, "enable_cache": False,
557
  })
558
 
 
559
  today_str = datetime.now(timezone.utc).strftime('%Y-%m-%d')
560
  contextual_query = (
561
  f"For context, today's date is {today_str}. "
 
564
  logger.info(f"Contextual query for PandasAI: {contextual_query}")
565
 
566
  try:
 
567
  response = lake.chat(contextual_query)
568
  return str(response)
569
  except NoCodeFoundError:
570
  logger.warning(f"PandasAI failed on first attempt (NoCodeFoundError) for query: '{query}'. Retrying with simplification.")
 
571
  simplified_query = (
572
  f"The previous attempt to answer the user's query failed. "
573
  f"Try again with a simpler approach. Instead of complex analysis, "
 
584
  except Exception as e:
585
  logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
586
  return "Sorry, I encountered an error while analyzing your data. There might be an issue with the records. Please check your recent transactions."
 
587
 
588
 
589
  def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]: