Update utility.py
Browse files- utility.py +57 -82
utility.py
CHANGED
|
@@ -6,11 +6,11 @@ from datetime import datetime, timezone
|
|
| 6 |
from typing import List, Dict, Union, Optional, Any
|
| 7 |
from google.cloud import firestore
|
| 8 |
import pandas as pd
|
| 9 |
-
import inflect
|
| 10 |
-
from thefuzz import process as fuzzy_process
|
| 11 |
from pandasai import SmartDatalake
|
| 12 |
from pandasai.responses.response_parser import ResponseParser
|
| 13 |
-
from pandasai.exceptions import NoCodeFoundError
|
| 14 |
from langchain_google_genai import ChatGoogleGenerativeAI
|
| 15 |
import google.generativeai as genai
|
| 16 |
import re
|
|
@@ -108,27 +108,23 @@ def generateResponse(prompt: str) -> str:
|
|
| 108 |
|
| 109 |
system_prompt = """
|
| 110 |
Analyze the user's request for business transaction management. Your goal is to extract structured information about one or more transactions and output it as a valid JSON list.
|
| 111 |
-
|
| 112 |
-
|
| 113 |
-
|
| 114 |
-
|
| 115 |
-
|
| 116 |
-
|
| 117 |
-
- `"
|
| 118 |
-
|
| 119 |
-
- `"
|
| 120 |
-
|
| 121 |
-
**
|
| 122 |
-
- **For
|
| 123 |
-
- **For
|
| 124 |
-
|
| 125 |
-
- **
|
| 126 |
-
- **
|
| 127 |
-
|
| 128 |
-
**4. Important Rules:**
|
| 129 |
-
- **Rule for Queries:** For "read" intents or general questions, set `transaction_type` to "query" and the `details` object MUST contain a single key `"query"` with the user's full, original question as the value.
|
| 130 |
-
- **Rule for Multiple Items:** If the user's request contains multiple distinct transactions (e.g., recording an expense AND an asset), create a separate JSON object for each one within the main list.
|
| 131 |
-
- **Rule for Expense Normalization:** For "create" intents with `transaction_type` "expense", analyze the `description`. If it contains common keywords, normalize it to a single word. For example, if the description is "paid for fuel for the delivery truck", the normalized `description` in the JSON should be "fuel". If it's "office electricity bill", normalize it to "electricity".
|
| 132 |
"""
|
| 133 |
try:
|
| 134 |
full_prompt = [system_prompt, prompt]
|
|
@@ -172,33 +168,25 @@ def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
|
|
| 172 |
Finds the canonical version of an item using fuzzy matching for existing items
|
| 173 |
and inflect for new ones.
|
| 174 |
"""
|
| 175 |
-
# --- CHANGE 1: Fuzzy Search and Robust Pluralization ---
|
| 176 |
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
|
| 177 |
name_lower = item_name.lower().strip()
|
| 178 |
|
| 179 |
-
# 1. Fetch all existing item names for fuzzy matching
|
| 180 |
all_item_docs = list(inventory_ref.stream())
|
| 181 |
all_item_names = [doc.id for doc in all_item_docs]
|
| 182 |
|
| 183 |
if all_item_names:
|
| 184 |
-
# 2. Find the best match using fuzzy logic
|
| 185 |
best_match = fuzzy_process.extractOne(name_lower, all_item_names)
|
| 186 |
-
|
| 187 |
-
# 3. Apply a strict threshold
|
| 188 |
if best_match and best_match[1] >= 90:
|
| 189 |
matched_name = best_match[0]
|
| 190 |
-
# Find the corresponding document
|
| 191 |
for doc in all_item_docs:
|
| 192 |
if doc.id == matched_name:
|
| 193 |
return {'doc': doc, 'name': matched_name}
|
| 194 |
|
| 195 |
-
# 4. If no good match is found, create a clean singular name for a new item
|
| 196 |
singular = p.singular_noun(name_lower)
|
| 197 |
if not singular:
|
| 198 |
singular = name_lower
|
| 199 |
|
| 200 |
return {'doc': None, 'name': singular}
|
| 201 |
-
# --- END OF CHANGE 1 ---
|
| 202 |
|
| 203 |
|
| 204 |
def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
@@ -232,7 +220,7 @@ def create_or_update_inventory_or_service_offering(user_phone: str, transaction_
|
|
| 232 |
doc_ref = inventory_ref.document(canonical_name)
|
| 233 |
doc_data = {
|
| 234 |
'details': {**details, change_key: firestore.Increment(change_amount)},
|
| 235 |
-
'type': 'service' if 'service_name' in details else 'good',
|
| 236 |
'last_updated': datetime.now(timezone.utc).isoformat(),
|
| 237 |
}
|
| 238 |
|
|
@@ -253,7 +241,7 @@ def create_or_update_inventory_or_service_offering(user_phone: str, transaction_
|
|
| 253 |
|
| 254 |
def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 255 |
"""
|
| 256 |
-
Process sales with fuzzy name matching, user price override, and service
|
| 257 |
"""
|
| 258 |
feedback_messages = []
|
| 259 |
any_success = False
|
|
@@ -269,10 +257,7 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 269 |
try:
|
| 270 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 271 |
canonical_name = canonical_info['name']
|
| 272 |
-
|
| 273 |
-
inventory_data = inventory_doc.to_dict() if inventory_doc else None
|
| 274 |
-
item_type = inventory_data.get('type') if inventory_data else ('service' if 'service_name' in details else 'good')
|
| 275 |
-
|
| 276 |
last_selling_price = None
|
| 277 |
sales_ref = db.collection("users").document(user_phone).collection("sales")
|
| 278 |
all_sales_query = sales_ref.where('details.item', '==', canonical_name)
|
|
@@ -285,14 +270,16 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 285 |
|
| 286 |
@firestore.transactional
|
| 287 |
def process_one_sale(transaction, sale_details):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 288 |
user_price = sale_details.get('price') or sale_details.get('unit_price')
|
| 289 |
|
| 290 |
if user_price is not None:
|
| 291 |
selling_price = user_price
|
| 292 |
-
logger.info(f"Using user-specified price for '{canonical_name}': {selling_price}")
|
| 293 |
elif last_selling_price is not None:
|
| 294 |
selling_price = last_selling_price
|
| 295 |
-
logger.info(f"Using last known price for '{canonical_name}': {selling_price}")
|
| 296 |
else:
|
| 297 |
return f"Sale failed for new item '{canonical_name}': You must specify a price for the first sale."
|
| 298 |
|
|
@@ -325,12 +312,17 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 325 |
if current_stock < quantity_sold:
|
| 326 |
return f"Sale failed for '{canonical_name}': Insufficient stock (Have: {current_stock}, Need: {quantity_sold})."
|
| 327 |
|
| 328 |
-
transaction.update(item_doc_ref, {
|
| 329 |
-
f'details.{stock_key}': firestore.Increment(-quantity_sold),
|
| 330 |
-
'last_updated': datetime.now(timezone.utc).isoformat()
|
| 331 |
-
})
|
| 332 |
elif item_type == 'good':
|
| 333 |
return f"Sale failed for '{canonical_name}': Item not found in inventory. Please add it first."
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 334 |
|
| 335 |
sale_doc_ref = sales_ref.document()
|
| 336 |
sale_record = {
|
|
@@ -355,7 +347,6 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 355 |
|
| 356 |
|
| 357 |
def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 358 |
-
"""Create new expense records in Firestore."""
|
| 359 |
batch = db.batch()
|
| 360 |
expenses_ref = db.collection("users").document(user_phone).collection("expenses")
|
| 361 |
success_count = 0
|
|
@@ -390,7 +381,6 @@ def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool,
|
|
| 390 |
|
| 391 |
|
| 392 |
def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 393 |
-
"""Create new asset records in Firestore with data stored in a 'details' sub-document."""
|
| 394 |
batch = db.batch()
|
| 395 |
assets_ref = db.collection("users").document(user_phone).collection("assets")
|
| 396 |
success_count = 0
|
|
@@ -425,7 +415,6 @@ def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, s
|
|
| 425 |
|
| 426 |
|
| 427 |
def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 428 |
-
"""Create new liability records in Firestore."""
|
| 429 |
batch = db.batch()
|
| 430 |
liabilities_ref = db.collection("users").document(user_phone).collection("liabilities")
|
| 431 |
success_count = 0
|
|
@@ -469,43 +458,33 @@ def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
|
| 469 |
if df.empty:
|
| 470 |
return df
|
| 471 |
|
| 472 |
-
# --- CHANGE 2: Robust Data Validation ---
|
| 473 |
-
# 1. Validate and convert timestamp columns to a consistent UTC format
|
| 474 |
for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
|
| 475 |
if col in df.columns:
|
| 476 |
-
# The key fix: utc=True handles mixed timezone-aware/naive data
|
| 477 |
df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
|
| 478 |
|
| 479 |
-
# 2. Validate and convert numeric columns
|
| 480 |
numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available']
|
| 481 |
for col in numeric_cols:
|
| 482 |
if col in df.columns:
|
| 483 |
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
|
| 484 |
|
| 485 |
-
# 3. Validate and fill missing categorical/object columns
|
| 486 |
for col in df.select_dtypes(include=['object']).columns:
|
| 487 |
df[col] = df[col].fillna('Unknown')
|
| 488 |
|
| 489 |
return df
|
| 490 |
-
# --- END OF CHANGE 2 ---
|
| 491 |
|
| 492 |
|
| 493 |
def _fetch_all_collections_as_dfs(user_phone: str) -> List[pd.DataFrame]:
|
| 494 |
"""
|
| 495 |
-
Fetches all user data, splits
|
| 496 |
"""
|
| 497 |
-
collections = ['sales', 'expenses', 'assets', 'liabilities']
|
| 498 |
all_dfs = []
|
| 499 |
|
| 500 |
-
# Handle inventory and services separately
|
| 501 |
inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream()
|
| 502 |
-
inventory_data = []
|
| 503 |
-
services_data = []
|
| 504 |
for doc in inv_serv_docs:
|
| 505 |
doc_data = doc.to_dict()
|
| 506 |
flat_data = {**doc_data, **doc_data.get('details', {})}
|
| 507 |
if 'details' in flat_data: del flat_data['details']
|
| 508 |
-
|
| 509 |
if doc_data.get('type') == 'service':
|
| 510 |
services_data.append(flat_data)
|
| 511 |
else:
|
|
@@ -521,23 +500,28 @@ def _fetch_all_collections_as_dfs(user_phone: str) -> List[pd.DataFrame]:
|
|
| 521 |
services_df.name = "services"
|
| 522 |
all_dfs.append(_validate_dataframe(services_df))
|
| 523 |
|
| 524 |
-
|
| 525 |
-
for coll_name in
|
| 526 |
docs = db.collection("users").document(user_phone).collection(coll_name).stream()
|
| 527 |
-
data = []
|
| 528 |
-
for doc in docs:
|
| 529 |
-
doc_data = doc.to_dict()
|
| 530 |
-
if 'details' in doc_data and isinstance(doc_data['details'], dict):
|
| 531 |
-
flat_data = {**doc_data, **doc_data['details']}
|
| 532 |
-
del flat_data['details']
|
| 533 |
-
data.append(flat_data)
|
| 534 |
-
else:
|
| 535 |
-
data.append(doc_data)
|
| 536 |
|
| 537 |
if data:
|
| 538 |
-
|
| 539 |
-
|
| 540 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 541 |
|
| 542 |
return all_dfs
|
| 543 |
|
|
@@ -587,7 +571,6 @@ def read_datalake(user_phone: str, query: str) -> str:
|
|
| 587 |
|
| 588 |
|
| 589 |
def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
|
| 590 |
-
"""Helper to find a single document in a collection matching specific details."""
|
| 591 |
col_ref = db.collection("users").document(user_phone).collection(collection_name)
|
| 592 |
|
| 593 |
if 'transaction_id' in details and details['transaction_id']:
|
|
@@ -624,7 +607,6 @@ def _find_document_by_details(user_phone: str, collection_name: str, details: Di
|
|
| 624 |
|
| 625 |
|
| 626 |
def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 627 |
-
"""Update existing transaction(s) based on provided data."""
|
| 628 |
feedback = []
|
| 629 |
any_success = False
|
| 630 |
for trans in transaction_data:
|
|
@@ -669,7 +651,6 @@ def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 669 |
|
| 670 |
|
| 671 |
def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 672 |
-
"""Delete specified transactions."""
|
| 673 |
feedback = []
|
| 674 |
any_success = False
|
| 675 |
for trans in transaction_data:
|
|
@@ -706,7 +687,6 @@ def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 706 |
|
| 707 |
|
| 708 |
def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool:
|
| 709 |
-
"""Store transactions temporarily in Firestore before confirmation."""
|
| 710 |
if not transactions: return False
|
| 711 |
try:
|
| 712 |
doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document("pending")
|
|
@@ -719,7 +699,6 @@ def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool
|
|
| 719 |
CURRENCY_SYMBOL_REGEX = re.compile(r"^\s*[\$\£\€\¥\₹R]")
|
| 720 |
|
| 721 |
def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str:
|
| 722 |
-
"""Format transaction data for user display, looking inside the 'details' field."""
|
| 723 |
if not transactions: return "No transaction data to display."
|
| 724 |
if isinstance(transactions, dict): transactions = [transactions]
|
| 725 |
|
|
@@ -757,7 +736,6 @@ def format_transaction_response(transactions: Union[List[Dict], Dict, None]) ->
|
|
| 757 |
|
| 758 |
|
| 759 |
def fetch_transaction(user_phone: str, identifier: str, collection: str = "inventory_and_services"):
|
| 760 |
-
"""Retrieve a specific transaction from Firestore."""
|
| 761 |
try:
|
| 762 |
doc = db.collection("users").document(user_phone).collection(collection).document(identifier).get()
|
| 763 |
if doc.exists: return doc.to_dict()
|
|
@@ -768,9 +746,6 @@ def fetch_transaction(user_phone: str, identifier: str, collection: str = "inven
|
|
| 768 |
|
| 769 |
|
| 770 |
def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
|
| 771 |
-
"""
|
| 772 |
-
Groups transactions by intent and type, processes each group, and returns a consolidated feedback report.
|
| 773 |
-
"""
|
| 774 |
if not parsed_trans_data:
|
| 775 |
return "I couldn't understand the transaction details. Could you please try again?"
|
| 776 |
|
|
|
|
| 6 |
from typing import List, Dict, Union, Optional, Any
|
| 7 |
from google.cloud import firestore
|
| 8 |
import pandas as pd
|
| 9 |
+
import inflect
|
| 10 |
+
from thefuzz import process as fuzzy_process
|
| 11 |
from pandasai import SmartDatalake
|
| 12 |
from pandasai.responses.response_parser import ResponseParser
|
| 13 |
+
from pandasai.exceptions import NoCodeFoundError
|
| 14 |
from langchain_google_genai import ChatGoogleGenerativeAI
|
| 15 |
import google.generativeai as genai
|
| 16 |
import re
|
|
|
|
| 108 |
|
| 109 |
system_prompt = """
|
| 110 |
Analyze the user's request for business transaction management. Your goal is to extract structured information about one or more transactions and output it as a valid JSON list.
|
| 111 |
+
**1. Output Format:**
|
| 112 |
+
You MUST output your response as a valid JSON list `[]` containing one or more transaction objects `{}`.
|
| 113 |
+
**2. Transaction Object Structure:**
|
| 114 |
+
Each transaction object MUST have the following keys:
|
| 115 |
+
- `"intent"`: The user's goal (e.g., "create", "read", "update", "delete").
|
| 116 |
+
- `"transaction_type"`: The category of the transaction (e.g., "sale", "purchase", "inventory", "expense", "asset", "liability", "query", "service_offering").
|
| 117 |
+
- `"details"`: An object containing key-value pairs extracted from the request.
|
| 118 |
+
**3. Key Naming Conventions for the `details` Object:**
|
| 119 |
+
- **For Expenses:** Use `"amount"`, `"description"`, and `"category"`.
|
| 120 |
+
- **For Assets:** Use `"value"` for the monetary worth and `"name"` for the item's name.
|
| 121 |
+
- **For Liabilities:** Use `"amount"` and `"creditor"`.
|
| 122 |
+
- **For Sales/Inventory:** Use `"item"`, `"quantity"`, and `"price"`. For services sold by time, use `"hours"`.
|
| 123 |
+
- **For all financial transactions:** If a currency symbol or code is present (e.g., $, £, €, ZAR, R), include a `"currency"` key.
|
| 124 |
+
**4. Important Rules:**
|
| 125 |
+
- **Rule for Queries:** For "read" intents or general questions, set `transaction_type` to "query" and the `details` object MUST contain a single key `"query"` with the user's full, original question as the value.
|
| 126 |
+
- **Rule for Multiple Items:** If the user's request contains multiple distinct transactions, create a separate JSON object for each one.
|
| 127 |
+
- **Rule for Expense Normalization:** For "create" intents with `transaction_type` "expense", analyze the `description`. If it contains common keywords, normalize it to a single word. For example, "paid for fuel for the delivery truck" becomes "fuel".
|
|
|
|
|
|
|
|
|
|
|
|
|
| 128 |
"""
|
| 129 |
try:
|
| 130 |
full_prompt = [system_prompt, prompt]
|
|
|
|
| 168 |
Finds the canonical version of an item using fuzzy matching for existing items
|
| 169 |
and inflect for new ones.
|
| 170 |
"""
|
|
|
|
| 171 |
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
|
| 172 |
name_lower = item_name.lower().strip()
|
| 173 |
|
|
|
|
| 174 |
all_item_docs = list(inventory_ref.stream())
|
| 175 |
all_item_names = [doc.id for doc in all_item_docs]
|
| 176 |
|
| 177 |
if all_item_names:
|
|
|
|
| 178 |
best_match = fuzzy_process.extractOne(name_lower, all_item_names)
|
|
|
|
|
|
|
| 179 |
if best_match and best_match[1] >= 90:
|
| 180 |
matched_name = best_match[0]
|
|
|
|
| 181 |
for doc in all_item_docs:
|
| 182 |
if doc.id == matched_name:
|
| 183 |
return {'doc': doc, 'name': matched_name}
|
| 184 |
|
|
|
|
| 185 |
singular = p.singular_noun(name_lower)
|
| 186 |
if not singular:
|
| 187 |
singular = name_lower
|
| 188 |
|
| 189 |
return {'doc': None, 'name': singular}
|
|
|
|
| 190 |
|
| 191 |
|
| 192 |
def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
| 220 |
doc_ref = inventory_ref.document(canonical_name)
|
| 221 |
doc_data = {
|
| 222 |
'details': {**details, change_key: firestore.Increment(change_amount)},
|
| 223 |
+
'type': 'service' if 'service_name' in details or 'hours' in details else 'good',
|
| 224 |
'last_updated': datetime.now(timezone.utc).isoformat(),
|
| 225 |
}
|
| 226 |
|
|
|
|
| 241 |
|
| 242 |
def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 243 |
"""
|
| 244 |
+
Process sales with fuzzy name matching, user price override, and on-the-fly service creation.
|
| 245 |
"""
|
| 246 |
feedback_messages = []
|
| 247 |
any_success = False
|
|
|
|
| 257 |
try:
|
| 258 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 259 |
canonical_name = canonical_info['name']
|
| 260 |
+
|
|
|
|
|
|
|
|
|
|
| 261 |
last_selling_price = None
|
| 262 |
sales_ref = db.collection("users").document(user_phone).collection("sales")
|
| 263 |
all_sales_query = sales_ref.where('details.item', '==', canonical_name)
|
|
|
|
| 270 |
|
| 271 |
@firestore.transactional
|
| 272 |
def process_one_sale(transaction, sale_details):
|
| 273 |
+
is_new_item = canonical_info['doc'] is None
|
| 274 |
+
# Infer type for new items based on details like 'hours'
|
| 275 |
+
item_type = 'service' if is_new_item and 'hours' in sale_details else 'good'
|
| 276 |
+
|
| 277 |
user_price = sale_details.get('price') or sale_details.get('unit_price')
|
| 278 |
|
| 279 |
if user_price is not None:
|
| 280 |
selling_price = user_price
|
|
|
|
| 281 |
elif last_selling_price is not None:
|
| 282 |
selling_price = last_selling_price
|
|
|
|
| 283 |
else:
|
| 284 |
return f"Sale failed for new item '{canonical_name}': You must specify a price for the first sale."
|
| 285 |
|
|
|
|
| 312 |
if current_stock < quantity_sold:
|
| 313 |
return f"Sale failed for '{canonical_name}': Insufficient stock (Have: {current_stock}, Need: {quantity_sold})."
|
| 314 |
|
| 315 |
+
transaction.update(item_doc_ref, {f'details.{stock_key}': firestore.Increment(-quantity_sold)})
|
|
|
|
|
|
|
|
|
|
| 316 |
elif item_type == 'good':
|
| 317 |
return f"Sale failed for '{canonical_name}': Item not found in inventory. Please add it first."
|
| 318 |
+
elif is_new_item and item_type == 'service':
|
| 319 |
+
logger.info(f"Creating new service '{canonical_name}' during sale.")
|
| 320 |
+
service_record = {
|
| 321 |
+
'details': {'item': canonical_name, 'price': selling_price},
|
| 322 |
+
'type': 'service',
|
| 323 |
+
'last_updated': datetime.now(timezone.utc).isoformat()
|
| 324 |
+
}
|
| 325 |
+
transaction.set(item_doc_ref, service_record)
|
| 326 |
|
| 327 |
sale_doc_ref = sales_ref.document()
|
| 328 |
sale_record = {
|
|
|
|
| 347 |
|
| 348 |
|
| 349 |
def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
| 350 |
batch = db.batch()
|
| 351 |
expenses_ref = db.collection("users").document(user_phone).collection("expenses")
|
| 352 |
success_count = 0
|
|
|
|
| 381 |
|
| 382 |
|
| 383 |
def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
| 384 |
batch = db.batch()
|
| 385 |
assets_ref = db.collection("users").document(user_phone).collection("assets")
|
| 386 |
success_count = 0
|
|
|
|
| 415 |
|
| 416 |
|
| 417 |
def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
| 418 |
batch = db.batch()
|
| 419 |
liabilities_ref = db.collection("users").document(user_phone).collection("liabilities")
|
| 420 |
success_count = 0
|
|
|
|
| 458 |
if df.empty:
|
| 459 |
return df
|
| 460 |
|
|
|
|
|
|
|
| 461 |
for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
|
| 462 |
if col in df.columns:
|
|
|
|
| 463 |
df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
|
| 464 |
|
|
|
|
| 465 |
numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available']
|
| 466 |
for col in numeric_cols:
|
| 467 |
if col in df.columns:
|
| 468 |
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
|
| 469 |
|
|
|
|
| 470 |
for col in df.select_dtypes(include=['object']).columns:
|
| 471 |
df[col] = df[col].fillna('Unknown')
|
| 472 |
|
| 473 |
return df
|
|
|
|
| 474 |
|
| 475 |
|
| 476 |
def _fetch_all_collections_as_dfs(user_phone: str) -> List[pd.DataFrame]:
|
| 477 |
"""
|
| 478 |
+
Fetches all user data, splits/validates DataFrames, and engineers features.
|
| 479 |
"""
|
|
|
|
| 480 |
all_dfs = []
|
| 481 |
|
|
|
|
| 482 |
inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream()
|
| 483 |
+
inventory_data, services_data = [], []
|
|
|
|
| 484 |
for doc in inv_serv_docs:
|
| 485 |
doc_data = doc.to_dict()
|
| 486 |
flat_data = {**doc_data, **doc_data.get('details', {})}
|
| 487 |
if 'details' in flat_data: del flat_data['details']
|
|
|
|
| 488 |
if doc_data.get('type') == 'service':
|
| 489 |
services_data.append(flat_data)
|
| 490 |
else:
|
|
|
|
| 500 |
services_df.name = "services"
|
| 501 |
all_dfs.append(_validate_dataframe(services_df))
|
| 502 |
|
| 503 |
+
collections_to_fetch = {'sales': 'sales', 'expenses': 'expenses', 'assets': 'assets', 'liabilities': 'liabilities'}
|
| 504 |
+
for df_name, coll_name in collections_to_fetch.items():
|
| 505 |
docs = db.collection("users").document(user_phone).collection(coll_name).stream()
|
| 506 |
+
data = [doc.to_dict() for doc in docs]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 507 |
|
| 508 |
if data:
|
| 509 |
+
flat_data_list = []
|
| 510 |
+
for item in data:
|
| 511 |
+
flat_item = {**item, **item.get('details', {})}
|
| 512 |
+
if 'details' in flat_item: del flat_item['details']
|
| 513 |
+
flat_data_list.append(flat_item)
|
| 514 |
+
|
| 515 |
+
df = pd.DataFrame(flat_data_list)
|
| 516 |
+
df.name = df_name
|
| 517 |
+
|
| 518 |
+
validated_df = _validate_dataframe(df)
|
| 519 |
+
|
| 520 |
+
if df_name == 'sales':
|
| 521 |
+
if 'price' in validated_df.columns and 'quantity' in validated_df.columns:
|
| 522 |
+
validated_df['sale_total'] = validated_df['price'] * validated_df['quantity']
|
| 523 |
+
|
| 524 |
+
all_dfs.append(validated_df)
|
| 525 |
|
| 526 |
return all_dfs
|
| 527 |
|
|
|
|
| 571 |
|
| 572 |
|
| 573 |
def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
|
|
|
|
| 574 |
col_ref = db.collection("users").document(user_phone).collection(collection_name)
|
| 575 |
|
| 576 |
if 'transaction_id' in details and details['transaction_id']:
|
|
|
|
| 607 |
|
| 608 |
|
| 609 |
def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
| 610 |
feedback = []
|
| 611 |
any_success = False
|
| 612 |
for trans in transaction_data:
|
|
|
|
| 651 |
|
| 652 |
|
| 653 |
def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
| 654 |
feedback = []
|
| 655 |
any_success = False
|
| 656 |
for trans in transaction_data:
|
|
|
|
| 687 |
|
| 688 |
|
| 689 |
def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool:
|
|
|
|
| 690 |
if not transactions: return False
|
| 691 |
try:
|
| 692 |
doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document("pending")
|
|
|
|
| 699 |
CURRENCY_SYMBOL_REGEX = re.compile(r"^\s*[\$\£\€\¥\₹R]")
|
| 700 |
|
| 701 |
def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str:
|
|
|
|
| 702 |
if not transactions: return "No transaction data to display."
|
| 703 |
if isinstance(transactions, dict): transactions = [transactions]
|
| 704 |
|
|
|
|
| 736 |
|
| 737 |
|
| 738 |
def fetch_transaction(user_phone: str, identifier: str, collection: str = "inventory_and_services"):
|
|
|
|
| 739 |
try:
|
| 740 |
doc = db.collection("users").document(user_phone).collection(collection).document(identifier).get()
|
| 741 |
if doc.exists: return doc.to_dict()
|
|
|
|
| 746 |
|
| 747 |
|
| 748 |
def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
|
|
|
|
|
|
|
|
|
|
| 749 |
if not parsed_trans_data:
|
| 750 |
return "I couldn't understand the transaction details. Could you please try again?"
|
| 751 |
|