Update utility.py
Browse files- utility.py +94 -168
utility.py
CHANGED
|
@@ -2,7 +2,7 @@
|
|
| 2 |
import json
|
| 3 |
import os
|
| 4 |
import logging
|
| 5 |
-
from datetime import datetime, timezone
|
| 6 |
from typing import List, Dict, Union, Optional, Any, Tuple
|
| 7 |
from google.cloud import firestore
|
| 8 |
import pandas as pd
|
|
@@ -62,7 +62,6 @@ def render_df_as_image(df: pd.DataFrame) -> Optional[str]:
|
|
| 62 |
return "The data requested is empty."
|
| 63 |
try:
|
| 64 |
img_path = os.path.join(user_defined_path, f"report_{uuid.uuid4()}.png")
|
| 65 |
-
# Use a larger figure size for better readability on mobile
|
| 66 |
dfi.export(df, img_path, table_conversion='matplotlib', dpi=200)
|
| 67 |
return img_path
|
| 68 |
except Exception as e:
|
|
@@ -75,7 +74,7 @@ class FlaskResponse(ResponseParser):
|
|
| 75 |
super().__init__(context)
|
| 76 |
|
| 77 |
def format_dataframe(self, result):
|
| 78 |
-
#
|
| 79 |
df = result['value']
|
| 80 |
return render_df_as_image(df)
|
| 81 |
|
|
@@ -118,13 +117,56 @@ except Exception as e:
|
|
| 118 |
llm = None
|
| 119 |
|
| 120 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 121 |
def generateResponse(prompt: str) -> str:
|
| 122 |
"""Generate structured JSON response from user input using Generative AI."""
|
| 123 |
if not model:
|
| 124 |
return '{"error": "Model not available"}'
|
| 125 |
-
|
| 126 |
-
# --- CORRECTED SYSTEM PROMPT ---
|
| 127 |
-
# This is the restored, comprehensive prompt that correctly classifies multiple transaction types.
|
| 128 |
system_prompt = """
|
| 129 |
Analyze the user's request for business transaction management. Your goal is to extract structured information about one or more transactions and output it as a valid JSON list.
|
| 130 |
|
|
@@ -242,13 +284,11 @@ def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
|
|
| 242 |
all_item_names = [doc.id for doc in all_item_docs]
|
| 243 |
|
| 244 |
if all_item_names:
|
| 245 |
-
# 1. Exact Match First (to protect good data)
|
| 246 |
if name_lower in all_item_names:
|
| 247 |
for doc in all_item_docs:
|
| 248 |
if doc.id == name_lower:
|
| 249 |
return {'doc': doc, 'name': name_lower}
|
| 250 |
|
| 251 |
-
# 2. Fuzzy Match as a Fallback (for typos and history)
|
| 252 |
best_match = fuzzy_process.extractOne(name_lower, all_item_names)
|
| 253 |
if best_match and best_match[1] >= 90:
|
| 254 |
matched_name = best_match[0]
|
|
@@ -256,7 +296,6 @@ def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
|
|
| 256 |
if doc.id == matched_name:
|
| 257 |
return {'doc': doc, 'name': matched_name}
|
| 258 |
|
| 259 |
-
# 3. Create New as a Last Resort
|
| 260 |
singular = p.singular_noun(name_lower)
|
| 261 |
if not singular:
|
| 262 |
singular = name_lower
|
|
@@ -265,47 +304,37 @@ def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
|
|
| 265 |
|
| 266 |
|
| 267 |
def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 268 |
-
"""Create/update inventory items or service offerings with name normalization."""
|
| 269 |
batch = db.batch()
|
| 270 |
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
|
| 271 |
feedback_messages = []
|
| 272 |
success_count = 0
|
| 273 |
-
|
| 274 |
for transaction in transaction_data:
|
| 275 |
details = transaction.get('details', {})
|
| 276 |
item_name = details.get('item') or details.get('service_name')
|
| 277 |
-
|
| 278 |
if not item_name:
|
| 279 |
feedback_messages.append("Skipped: An inventory/service update was missing an item or service name.")
|
| 280 |
continue
|
| 281 |
-
|
| 282 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 283 |
canonical_name = canonical_info['name']
|
| 284 |
-
|
| 285 |
if 'item' in details: details['item'] = canonical_name
|
| 286 |
if 'service_name' in details: details['service_name'] = canonical_name
|
| 287 |
-
|
| 288 |
try:
|
| 289 |
change_key = 'quantity' if 'quantity' in details else 'units_available'
|
| 290 |
change_amount = int(details.get(change_key, 0))
|
| 291 |
except (ValueError, TypeError):
|
| 292 |
feedback_messages.append(f"Skipped '{canonical_name}': Invalid quantity or units format.")
|
| 293 |
continue
|
| 294 |
-
|
| 295 |
doc_ref = inventory_ref.document(canonical_name)
|
| 296 |
doc_data = {
|
| 297 |
'details': {**details, change_key: firestore.Increment(change_amount)},
|
| 298 |
'type': 'service' if 'service_name' in details else 'good',
|
| 299 |
'last_updated': datetime.now(timezone.utc).isoformat(),
|
| 300 |
}
|
| 301 |
-
|
| 302 |
batch.set(doc_ref, doc_data, merge=True)
|
| 303 |
feedback_messages.append(f"Processed '{canonical_name}': change of {change_amount}.")
|
| 304 |
success_count += 1
|
| 305 |
-
|
| 306 |
if success_count == 0:
|
| 307 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid inventory/service items to process."
|
| 308 |
-
|
| 309 |
try:
|
| 310 |
batch.commit()
|
| 311 |
return True, "\n".join(feedback_messages)
|
|
@@ -315,44 +344,31 @@ def create_or_update_inventory_or_service_offering(user_phone: str, transaction_
|
|
| 315 |
|
| 316 |
|
| 317 |
def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
| 318 |
-
"""
|
| 319 |
-
Process sales with fuzzy name matching, user price override, and on-the-fly service creation.
|
| 320 |
-
"""
|
| 321 |
feedback_messages = []
|
| 322 |
any_success = False
|
| 323 |
-
|
| 324 |
for t in transaction_data:
|
| 325 |
details = t.get('details', {})
|
| 326 |
item_name = details.get('item') or details.get('service_name')
|
| 327 |
-
|
| 328 |
if not item_name:
|
| 329 |
feedback_messages.append("Sale failed: Missing item or service name.")
|
| 330 |
continue
|
| 331 |
-
|
| 332 |
try:
|
| 333 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 334 |
canonical_name = canonical_info['name']
|
| 335 |
-
|
| 336 |
last_selling_price = None
|
| 337 |
sales_ref = db.collection("users").document(user_phone).collection("sales")
|
| 338 |
all_sales_query = sales_ref.where('details.item', '==', canonical_name)
|
| 339 |
all_sales_docs = list(all_sales_query.stream())
|
| 340 |
-
|
| 341 |
if all_sales_docs:
|
| 342 |
all_sales_docs.sort(key=lambda doc: doc.to_dict().get('timestamp', ''), reverse=True)
|
| 343 |
last_sale_data = all_sales_docs[0].to_dict()
|
| 344 |
last_selling_price = last_sale_data.get('details', {}).get('price')
|
| 345 |
-
|
| 346 |
@firestore.transactional
|
| 347 |
def process_one_sale(transaction, sale_details):
|
| 348 |
is_new_item = canonical_info['doc'] is None
|
| 349 |
-
|
| 350 |
-
# Infer type based on the original transaction type from the LLM
|
| 351 |
original_trans_type = t.get('transaction_type')
|
| 352 |
item_type = 'service' if original_trans_type == 'service_offering' else 'good'
|
| 353 |
-
|
| 354 |
user_price = sale_details.get('price') or sale_details.get('unit_price')
|
| 355 |
-
|
| 356 |
if user_price is not None:
|
| 357 |
selling_price = user_price
|
| 358 |
elif last_selling_price is not None:
|
|
@@ -362,35 +378,28 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 362 |
return f"Sale failed for new item '{canonical_name}': You must specify a price for the first sale."
|
| 363 |
else:
|
| 364 |
selling_price = 0
|
| 365 |
-
|
| 366 |
if not isinstance(selling_price, (int, float)): selling_price = 0
|
| 367 |
-
|
| 368 |
sale_details['price'] = selling_price
|
| 369 |
sale_details['item'] = canonical_name
|
| 370 |
if 'unit_price' in sale_details: del sale_details['unit_price']
|
| 371 |
if 'service_name' in sale_details: del sale_details['service_name']
|
| 372 |
-
|
| 373 |
try:
|
| 374 |
quantity_sold = int(sale_details.get('quantity', 1))
|
| 375 |
if quantity_sold <= 0: return f"Sale failed for '{canonical_name}': Invalid quantity ({quantity_sold})."
|
| 376 |
except (ValueError, TypeError):
|
| 377 |
return f"Sale failed for '{canonical_name}': Invalid quantity format."
|
| 378 |
-
|
| 379 |
item_doc_ref = db.collection("users").document(user_phone).collection("inventory_and_services").document(canonical_name)
|
| 380 |
item_snapshot = item_doc_ref.get(transaction=transaction)
|
| 381 |
-
|
| 382 |
item_cost = 0
|
| 383 |
if item_snapshot.exists:
|
| 384 |
inv_data = item_snapshot.to_dict()
|
| 385 |
inv_details = inv_data.get('details', {})
|
| 386 |
item_cost = inv_details.get('price') or inv_details.get('unit_price') or 0
|
| 387 |
-
|
| 388 |
if inv_data.get('type') == 'good':
|
| 389 |
stock_key = 'quantity'
|
| 390 |
current_stock = int(inv_details.get(stock_key, 0))
|
| 391 |
if current_stock < quantity_sold:
|
| 392 |
return f"Sale failed for '{canonical_name}': Insufficient stock (Have: {current_stock}, Need: {quantity_sold})."
|
| 393 |
-
|
| 394 |
transaction.update(item_doc_ref, {f'details.{stock_key}': firestore.Increment(-quantity_sold)})
|
| 395 |
elif item_type == 'good':
|
| 396 |
return f"Sale failed for '{canonical_name}': Item not found in inventory. Please add it first."
|
|
@@ -402,7 +411,6 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 402 |
'last_updated': datetime.now(timezone.utc).isoformat()
|
| 403 |
}
|
| 404 |
transaction.set(item_doc_ref, service_record)
|
| 405 |
-
|
| 406 |
sale_doc_ref = sales_ref.document()
|
| 407 |
sale_record = {
|
| 408 |
'details': {**sale_details, 'cost': item_cost},
|
|
@@ -410,10 +418,8 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 410 |
'status': 'completed',
|
| 411 |
'transaction_id': sale_doc_ref.id
|
| 412 |
}
|
| 413 |
-
|
| 414 |
transaction.set(sale_doc_ref, sale_record)
|
| 415 |
return f"Sale successful for {quantity_sold} x '{canonical_name}' at {sale_details.get('currency','')}{selling_price} each."
|
| 416 |
-
|
| 417 |
transaction_feedback = process_one_sale(db.transaction(), details)
|
| 418 |
feedback_messages.append(transaction_feedback)
|
| 419 |
if "successful" in transaction_feedback:
|
|
@@ -421,7 +427,6 @@ def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, st
|
|
| 421 |
except Exception as e:
|
| 422 |
logger.error(f"Transactional sale failed for '{item_name}': {e}", exc_info=True)
|
| 423 |
feedback_messages.append(f"Sale failed for '{item_name}': An unexpected error occurred.")
|
| 424 |
-
|
| 425 |
return any_success, "\n".join(feedback_messages)
|
| 426 |
|
| 427 |
|
|
@@ -430,15 +435,12 @@ def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool,
|
|
| 430 |
expenses_ref = db.collection("users").document(user_phone).collection("expenses")
|
| 431 |
success_count = 0
|
| 432 |
feedback_messages = []
|
| 433 |
-
|
| 434 |
for transaction in transaction_data:
|
| 435 |
details = transaction.get('details', {})
|
| 436 |
expense_desc = details.get('description', details.get('category', 'Unnamed Expense'))
|
| 437 |
-
|
| 438 |
if 'amount' not in details:
|
| 439 |
feedback_messages.append(f"Skipped expense '{expense_desc}': Missing amount.")
|
| 440 |
continue
|
| 441 |
-
|
| 442 |
doc_ref = expenses_ref.document()
|
| 443 |
expense_record = {
|
| 444 |
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
@@ -447,10 +449,8 @@ def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool,
|
|
| 447 |
batch.set(doc_ref, expense_record)
|
| 448 |
feedback_messages.append(f"Recorded expense: '{expense_desc}' for {details.get('currency','')}{details.get('amount')}.")
|
| 449 |
success_count += 1
|
| 450 |
-
|
| 451 |
if success_count == 0:
|
| 452 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid expense transactions to create."
|
| 453 |
-
|
| 454 |
try:
|
| 455 |
batch.commit()
|
| 456 |
return True, "\n".join(feedback_messages)
|
|
@@ -464,15 +464,12 @@ def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, s
|
|
| 464 |
assets_ref = db.collection("users").document(user_phone).collection("assets")
|
| 465 |
success_count = 0
|
| 466 |
feedback_messages = []
|
| 467 |
-
|
| 468 |
for transaction in transaction_data:
|
| 469 |
details = transaction.get('details', {})
|
| 470 |
asset_name = details.get('name', 'Unnamed Asset')
|
| 471 |
-
|
| 472 |
if 'value' not in details:
|
| 473 |
feedback_messages.append(f"Skipped asset '{asset_name}': Missing value.")
|
| 474 |
continue
|
| 475 |
-
|
| 476 |
doc_ref = assets_ref.document()
|
| 477 |
asset_record = {
|
| 478 |
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
@@ -481,10 +478,8 @@ def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, s
|
|
| 481 |
batch.set(doc_ref, asset_record)
|
| 482 |
feedback_messages.append(f"Recorded asset: '{asset_name}' with value {details.get('currency','')}{details.get('value')}.")
|
| 483 |
success_count += 1
|
| 484 |
-
|
| 485 |
if success_count == 0:
|
| 486 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid asset transactions to create."
|
| 487 |
-
|
| 488 |
try:
|
| 489 |
batch.commit()
|
| 490 |
return True, "\n".join(feedback_messages)
|
|
@@ -498,15 +493,12 @@ def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[boo
|
|
| 498 |
liabilities_ref = db.collection("users").document(user_phone).collection("liabilities")
|
| 499 |
success_count = 0
|
| 500 |
feedback_messages = []
|
| 501 |
-
|
| 502 |
for transaction in transaction_data:
|
| 503 |
details = transaction.get('details', {})
|
| 504 |
creditor = details.get('creditor', 'Unnamed Creditor')
|
| 505 |
-
|
| 506 |
if 'amount' not in details or not details.get('creditor'):
|
| 507 |
feedback_messages.append(f"Skipped liability '{creditor}': Missing amount or creditor.")
|
| 508 |
continue
|
| 509 |
-
|
| 510 |
doc_ref = liabilities_ref.document()
|
| 511 |
liability_record = {
|
| 512 |
'details': details,
|
|
@@ -518,10 +510,8 @@ def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[boo
|
|
| 518 |
batch.set(doc_ref, liability_record)
|
| 519 |
feedback_messages.append(f"Recorded liability to '{creditor}' for {details.get('currency','')}{details.get('amount')}.")
|
| 520 |
success_count += 1
|
| 521 |
-
|
| 522 |
if success_count == 0:
|
| 523 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid liability transactions to create."
|
| 524 |
-
|
| 525 |
try:
|
| 526 |
batch.commit()
|
| 527 |
return True, "\n".join(feedback_messages)
|
|
@@ -531,34 +521,24 @@ def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[boo
|
|
| 531 |
|
| 532 |
|
| 533 |
def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
| 534 |
-
"""
|
| 535 |
-
Proactively cleans and validates a DataFrame to ensure data integrity for PandasAI.
|
| 536 |
-
"""
|
| 537 |
if df.empty:
|
| 538 |
return df
|
| 539 |
-
|
| 540 |
for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
|
| 541 |
if col in df.columns:
|
| 542 |
df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
|
| 543 |
-
|
| 544 |
numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available']
|
| 545 |
for col in numeric_cols:
|
| 546 |
if col in df.columns:
|
| 547 |
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
|
| 548 |
-
|
| 549 |
for col in df.select_dtypes(include=['object']).columns:
|
| 550 |
df[col] = df[col].fillna('Unknown')
|
| 551 |
-
|
| 552 |
return df
|
| 553 |
|
| 554 |
|
| 555 |
def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]:
|
| 556 |
-
"""
|
| 557 |
-
Fetches all user data, splits/validates DataFrames, and engineers features.
|
| 558 |
-
Returns a list of (name, dataframe) tuples.
|
| 559 |
-
"""
|
| 560 |
all_dfs_with_names = []
|
| 561 |
-
|
| 562 |
inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream()
|
| 563 |
inventory_data, services_data = [], []
|
| 564 |
for doc in inv_serv_docs:
|
|
@@ -569,118 +549,89 @@ def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFra
|
|
| 569 |
services_data.append(flat_data)
|
| 570 |
else:
|
| 571 |
inventory_data.append(flat_data)
|
| 572 |
-
|
| 573 |
if inventory_data:
|
| 574 |
inventory_df = pd.DataFrame(inventory_data)
|
| 575 |
all_dfs_with_names.append(("inventory", _validate_dataframe(inventory_df)))
|
| 576 |
-
|
| 577 |
if services_data:
|
| 578 |
services_df = pd.DataFrame(services_data)
|
| 579 |
all_dfs_with_names.append(("services", _validate_dataframe(services_df)))
|
| 580 |
-
|
| 581 |
collections_to_fetch = {'sales': 'sales', 'expenses': 'expenses', 'assets': 'assets', 'liabilities': 'liabilities'}
|
| 582 |
for df_name, coll_name in collections_to_fetch.items():
|
| 583 |
docs = db.collection("users").document(user_phone).collection(coll_name).stream()
|
| 584 |
data = [doc.to_dict() for doc in docs]
|
| 585 |
-
|
| 586 |
if data:
|
| 587 |
flat_data_list = []
|
| 588 |
for item in data:
|
| 589 |
flat_item = {**item, **item.get('details', {})}
|
| 590 |
if 'details' in flat_item: del flat_item['details']
|
| 591 |
flat_data_list.append(flat_item)
|
| 592 |
-
|
| 593 |
df = pd.DataFrame(flat_data_list)
|
| 594 |
validated_df = _validate_dataframe(df)
|
| 595 |
-
|
| 596 |
-
|
| 597 |
-
if 'price' in validated_df.columns and 'quantity' in validated_df.columns:
|
| 598 |
-
validated_df['sale_total'] = validated_df['price'] * validated_df['quantity']
|
| 599 |
-
|
| 600 |
all_dfs_with_names.append((df_name, validated_df))
|
| 601 |
-
|
| 602 |
return all_dfs_with_names
|
| 603 |
|
| 604 |
|
| 605 |
def read_datalake(user_phone: str, query: str) -> str:
|
| 606 |
"""
|
| 607 |
-
Handles queries
|
| 608 |
-
|
|
|
|
|
|
|
| 609 |
"""
|
| 610 |
-
# SURGICALLY MODIFIED: This function is enhanced to handle reporting.
|
| 611 |
try:
|
| 612 |
all_dfs_with_names = _fetch_all_collections_as_dfs(user_phone)
|
| 613 |
if not all_dfs_with_names:
|
| 614 |
return "You have no data recorded yet. Please add some transactions first."
|
| 615 |
|
| 616 |
-
#
|
| 617 |
-
|
| 618 |
-
|
| 619 |
-
|
| 620 |
-
today_str = datetime.now(timezone.utc).strftime('%Y-%m-%d')
|
| 621 |
-
|
| 622 |
-
reporting_prompt_template = """
|
| 623 |
-
As a business data analyst, generate a report for the user's request: '{query}'.
|
| 624 |
-
Today's date is {today_str}. The data is in the provided DataFrames.
|
| 625 |
-
|
| 626 |
-
Structure your response in the following format.
|
| 627 |
-
**IMPORTANT: Use ONLY WhatsApp-compatible Markdown for formatting (*bold*, _italic_, ~strikethrough~, ```monospace```).**
|
| 628 |
|
| 629 |
-
|
| 630 |
-
|
| 631 |
-
|
| 632 |
-
A one-sentence summary of the key finding.
|
| 633 |
|
| 634 |
-
|
| 635 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 636 |
|
| 637 |
-
|
| 638 |
-
|
| 639 |
|
| 640 |
-
|
| 641 |
-
|
| 642 |
-
|
|
|
|
| 643 |
|
| 644 |
-
if is_report_request:
|
| 645 |
-
contextual_query = reporting_prompt_template.format(query=query, today_str=today_str)
|
| 646 |
else:
|
| 647 |
-
|
| 648 |
-
|
| 649 |
-
|
| 650 |
-
|
| 651 |
-
|
| 652 |
-
|
| 653 |
-
|
|
|
|
|
|
|
|
|
|
| 654 |
|
| 655 |
-
|
| 656 |
-
|
| 657 |
-
|
| 658 |
-
|
| 659 |
-
)
|
| 660 |
-
|
| 661 |
-
|
| 662 |
-
|
| 663 |
-
lake = SmartDatalake(datalake_dfs, config={
|
| 664 |
-
"llm": llm, "response_parser": FlaskResponse,
|
| 665 |
-
"save_charts_path": user_defined_path, "enable_cache": False,
|
| 666 |
-
"conversational": True
|
| 667 |
-
})
|
| 668 |
-
|
| 669 |
-
logger.info(f"Contextual query for PandasAI: {contextual_query}")
|
| 670 |
-
|
| 671 |
-
try:
|
| 672 |
-
response = lake.chat(contextual_query)
|
| 673 |
-
# The response is now either a string (narrative/simple answer) or an image path
|
| 674 |
-
return str(response)
|
| 675 |
-
except NoCodeFoundError:
|
| 676 |
-
logger.warning(f"PandasAI failed on first attempt for query: '{query}'. Retrying.")
|
| 677 |
-
# Use a simpler prompt for retry, focusing on just getting the data
|
| 678 |
-
simplified_query = f"The previous attempt failed. Try a simpler way. Just return the data for this query: '{query}'"
|
| 679 |
-
response = lake.chat(simplified_query)
|
| 680 |
return str(response)
|
| 681 |
|
| 682 |
except NoCodeFoundError:
|
| 683 |
-
logger.error(f"PandasAI failed
|
| 684 |
return "I'm sorry, I couldn't figure out how to answer that question with your data. Please try rephrasing it."
|
| 685 |
except Exception as e:
|
| 686 |
logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
|
|
@@ -689,22 +640,18 @@ def read_datalake(user_phone: str, query: str) -> str:
|
|
| 689 |
|
| 690 |
def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
|
| 691 |
col_ref = db.collection("users").document(user_phone).collection(collection_name)
|
| 692 |
-
|
| 693 |
if 'transaction_id' in details and details['transaction_id']:
|
| 694 |
doc = col_ref.document(details['transaction_id']).get()
|
| 695 |
if doc.exists: return {"id": doc.id, "data": doc.to_dict()}
|
| 696 |
-
|
| 697 |
if collection_name in ['inventory_and_services', 'sales'] and ('item' in details or 'service_name' in details):
|
| 698 |
item_name = details.get('item') or details.get('service_name')
|
| 699 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 700 |
if canonical_info['doc']:
|
| 701 |
doc = canonical_info['doc']
|
| 702 |
return {"id": doc.id, "data": doc.to_dict()}
|
| 703 |
-
|
| 704 |
query = col_ref
|
| 705 |
key_map = {'expenses': 'description', 'assets': 'name', 'liabilities': 'creditor'}
|
| 706 |
search_key = key_map.get(collection_name)
|
| 707 |
-
|
| 708 |
filters_applied = False
|
| 709 |
if search_key and search_key in details:
|
| 710 |
query = query.where(f'details.{search_key}', '==', details[search_key]); filters_applied = True
|
|
@@ -712,12 +659,9 @@ def _find_document_by_details(user_phone: str, collection_name: str, details: Di
|
|
| 712 |
query = query.where('details.amount', '==', details['amount']); filters_applied = True
|
| 713 |
if 'value' in details:
|
| 714 |
query = query.where('details.value', '==', details['value']); filters_applied = True
|
| 715 |
-
|
| 716 |
if not filters_applied: return None
|
| 717 |
-
|
| 718 |
docs = query.limit(2).stream()
|
| 719 |
found_docs = [{"id": doc.id, "data": doc.to_dict()} for doc in docs]
|
| 720 |
-
|
| 721 |
if len(found_docs) == 1: return found_docs[0]
|
| 722 |
elif len(found_docs) > 1: return "multiple_matches"
|
| 723 |
else: return None
|
|
@@ -734,11 +678,9 @@ def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 734 |
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities'
|
| 735 |
}
|
| 736 |
collection_name = collection_map.get(trans_type)
|
| 737 |
-
|
| 738 |
if not collection_name:
|
| 739 |
feedback.append(f"Update skipped: Unknown type '{trans_type}'.")
|
| 740 |
continue
|
| 741 |
-
|
| 742 |
target_doc = _find_document_by_details(user_phone, collection_name, details)
|
| 743 |
if target_doc == "multiple_matches":
|
| 744 |
feedback.append(f"Update for {trans_type} failed: Multiple records match. Please be more specific.")
|
|
@@ -746,16 +688,13 @@ def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 746 |
if not target_doc:
|
| 747 |
feedback.append(f"Update for {trans_type} failed: No record found matching your description.")
|
| 748 |
continue
|
| 749 |
-
|
| 750 |
doc_id = target_doc["id"]
|
| 751 |
doc_ref = db.collection("users").document(user_phone).collection(collection_name).document(doc_id)
|
| 752 |
-
|
| 753 |
try:
|
| 754 |
updates = {f"details.{k}": v for k, v in details.items() if k != 'transaction_id'}
|
| 755 |
if not updates:
|
| 756 |
feedback.append(f"Update for {trans_type} (ID: {doc_id}) skipped: No new data provided.")
|
| 757 |
continue
|
| 758 |
-
|
| 759 |
updates['last_updated'] = datetime.now(timezone.utc).isoformat()
|
| 760 |
doc_ref.update(updates)
|
| 761 |
feedback.append(f"Successfully updated {trans_type} record (ID: {doc_id}).")
|
|
@@ -763,7 +702,6 @@ def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 763 |
except Exception as e:
|
| 764 |
logger.error(f"Update failed for doc '{doc_id}': {e}", exc_info=True)
|
| 765 |
feedback.append(f"Update for {trans_type} (ID: {doc_id}) failed with an error.")
|
| 766 |
-
|
| 767 |
return any_success, "\n".join(feedback)
|
| 768 |
|
| 769 |
|
|
@@ -778,11 +716,9 @@ def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 778 |
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities'
|
| 779 |
}
|
| 780 |
collection_name = collection_map.get(trans_type)
|
| 781 |
-
|
| 782 |
if not collection_name:
|
| 783 |
feedback.append(f"Delete skipped: Unknown type '{trans_type}'.")
|
| 784 |
continue
|
| 785 |
-
|
| 786 |
target_doc = _find_document_by_details(user_phone, collection_name, details)
|
| 787 |
if target_doc == "multiple_matches":
|
| 788 |
feedback.append(f"Delete for {trans_type} failed: Multiple records match.")
|
|
@@ -790,7 +726,6 @@ def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 790 |
if not target_doc:
|
| 791 |
feedback.append(f"Delete for {trans_type} failed: No record found.")
|
| 792 |
continue
|
| 793 |
-
|
| 794 |
doc_id = target_doc["id"]
|
| 795 |
try:
|
| 796 |
db.collection("users").document(user_phone).collection(collection_name).document(doc_id).delete()
|
|
@@ -799,7 +734,6 @@ def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[b
|
|
| 799 |
except Exception as e:
|
| 800 |
logger.error(f"Delete failed for doc '{doc_id}': {e}", exc_info=True)
|
| 801 |
feedback.append(f"Delete for {trans_type} (ID: {doc_id}) failed with an error.")
|
| 802 |
-
|
| 803 |
return any_success, "\n".join(feedback)
|
| 804 |
|
| 805 |
|
|
@@ -813,38 +747,33 @@ def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool
|
|
| 813 |
logger.error(f"Failed to persist temporary transaction for user {mobile}: {e}", exc_info=True)
|
| 814 |
return False
|
| 815 |
|
|
|
|
| 816 |
CURRENCY_SYMBOL_REGEX = re.compile(r"^\s*[\$\£\€\¥\₹R]")
|
| 817 |
|
| 818 |
def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str:
|
| 819 |
if not transactions: return "No transaction data to display."
|
| 820 |
if isinstance(transactions, dict): transactions = [transactions]
|
| 821 |
-
|
| 822 |
output_lines = []
|
| 823 |
for idx, trans in enumerate(transactions):
|
| 824 |
if not isinstance(trans, dict): continue
|
| 825 |
-
|
| 826 |
details = trans.get('details', trans)
|
| 827 |
trans_type = trans.get('transaction_type', 'Item').replace("_", " ").title()
|
| 828 |
title = f"{trans_type}"
|
| 829 |
if len(transactions) > 1: output_lines.append(f"--- {title} {idx + 1} ---")
|
| 830 |
else: output_lines.append(f"--- {title} ---")
|
| 831 |
-
|
| 832 |
key_order = [
|
| 833 |
'transaction_id', 'item', 'service_name', 'name', 'creditor', 'category',
|
| 834 |
'quantity', 'units_available', 'hours', 'price', 'rate', 'amount', 'cost', 'value',
|
| 835 |
'customer', 'vendor', 'client', 'date', 'acquisition_date', 'due_date', 'description', 'type'
|
| 836 |
]
|
| 837 |
-
|
| 838 |
displayed_keys = set()
|
| 839 |
if 'transaction_id' in trans:
|
| 840 |
output_lines.append(f"• Transaction ID: {trans['transaction_id']}")
|
| 841 |
displayed_keys.add('transaction_id')
|
| 842 |
-
|
| 843 |
for key in key_order:
|
| 844 |
if key in details and key not in displayed_keys:
|
| 845 |
output_lines.append(f"• {key.replace('_', ' ').title()}: {details[key]}")
|
| 846 |
displayed_keys.add(key)
|
| 847 |
-
|
| 848 |
for key, value in details.items():
|
| 849 |
if key not in displayed_keys and key != 'currency':
|
| 850 |
output_lines.append(f"• {key.replace('_', ' ').title()}: {value}")
|
|
@@ -868,7 +797,6 @@ def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
|
|
| 868 |
"""
|
| 869 |
if not parsed_trans_data:
|
| 870 |
return "I couldn't understand the transaction details. Could you please try again?"
|
| 871 |
-
|
| 872 |
grouped_transactions = {}
|
| 873 |
for trans in parsed_trans_data:
|
| 874 |
intent = trans.get('intent', 'unknown').lower()
|
|
@@ -876,7 +804,6 @@ def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
|
|
| 876 |
key = (intent, trans_type)
|
| 877 |
if key not in grouped_transactions: grouped_transactions[key] = []
|
| 878 |
grouped_transactions[key].append(trans)
|
| 879 |
-
|
| 880 |
final_feedback = []
|
| 881 |
for (intent, trans_type), transactions in grouped_transactions.items():
|
| 882 |
logger.info(f"Processing group: {intent} - {trans_type} for user {mobile}")
|
|
@@ -917,6 +844,5 @@ def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
|
|
| 917 |
except Exception as e:
|
| 918 |
logger.error(f"Error processing group ({intent}, {trans_type}) for user {mobile}: {e}", exc_info=True)
|
| 919 |
final_feedback.append(f"An unexpected error occurred while processing your {trans_type} {intent} request.")
|
| 920 |
-
|
| 921 |
if not final_feedback: return "No actions were processed from your request."
|
| 922 |
return "\n\n".join(final_feedback).strip()
|
|
|
|
| 2 |
import json
|
| 3 |
import os
|
| 4 |
import logging
|
| 5 |
+
from datetime import datetime, timezone, timedelta
|
| 6 |
from typing import List, Dict, Union, Optional, Any, Tuple
|
| 7 |
from google.cloud import firestore
|
| 8 |
import pandas as pd
|
|
|
|
| 62 |
return "The data requested is empty."
|
| 63 |
try:
|
| 64 |
img_path = os.path.join(user_defined_path, f"report_{uuid.uuid4()}.png")
|
|
|
|
| 65 |
dfi.export(df, img_path, table_conversion='matplotlib', dpi=200)
|
| 66 |
return img_path
|
| 67 |
except Exception as e:
|
|
|
|
| 74 |
super().__init__(context)
|
| 75 |
|
| 76 |
def format_dataframe(self, result):
|
| 77 |
+
# MODIFIED: Universal DataFrame to image rendering pipeline
|
| 78 |
df = result['value']
|
| 79 |
return render_df_as_image(df)
|
| 80 |
|
|
|
|
| 117 |
llm = None
|
| 118 |
|
| 119 |
|
| 120 |
+
# --- NEW FEATURE: Programmatic Report Engine ---
|
| 121 |
+
class ReportEngine:
|
| 122 |
+
def __init__(self, dfs_with_names: List[Tuple[str, pd.DataFrame]]):
|
| 123 |
+
self.dfs = {name: df for name, df in dfs_with_names}
|
| 124 |
+
self.now = datetime.now(timezone.utc)
|
| 125 |
+
self.kpis = {}
|
| 126 |
+
|
| 127 |
+
def _get_time_boundaries(self):
|
| 128 |
+
today_start = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
|
| 129 |
+
week_start = today_start - timedelta(days=self.now.weekday())
|
| 130 |
+
month_start = today_start.replace(day=1)
|
| 131 |
+
return {"today": today_start, "this_week": week_start, "this_month": month_start}
|
| 132 |
+
|
| 133 |
+
def calculate_all_kpis(self):
|
| 134 |
+
boundaries = self._get_time_boundaries()
|
| 135 |
+
sales_df = self.dfs.get('sales')
|
| 136 |
+
|
| 137 |
+
if sales_df is not None and not sales_df.empty:
|
| 138 |
+
sales_df['timestamp'] = pd.to_datetime(sales_df['timestamp'], errors='coerce', utc=True)
|
| 139 |
+
|
| 140 |
+
# Overall KPIs
|
| 141 |
+
self.kpis['overall_revenue'] = sales_df['sale_total'].sum()
|
| 142 |
+
self.kpis['overall_sales_count'] = len(sales_df)
|
| 143 |
+
|
| 144 |
+
# Time-based KPIs
|
| 145 |
+
for period, start_date in boundaries.items():
|
| 146 |
+
mask = sales_df['timestamp'] >= start_date
|
| 147 |
+
period_sales = sales_df.loc[mask]
|
| 148 |
+
self.kpis[f'revenue_{period}'] = period_sales['sale_total'].sum()
|
| 149 |
+
self.kpis[f'sales_count_{period}'] = len(period_sales)
|
| 150 |
+
return self
|
| 151 |
+
|
| 152 |
+
def format_kpis_for_llm(self) -> str:
|
| 153 |
+
if not self.kpis:
|
| 154 |
+
return "No sales data available to generate KPIs."
|
| 155 |
+
|
| 156 |
+
return f"""
|
| 157 |
+
*Business KPIs (as of {self.now.strftime('%Y-%m-%d')})*
|
| 158 |
+
- *Revenue Today:* ${self.kpis.get('revenue_today', 0):.2f} from {self.kpis.get('sales_count_today', 0)} sales.
|
| 159 |
+
- *Revenue This Week:* ${self.kpis.get('revenue_this_week', 0):.2f} from {self.kpis.get('sales_count_this_week', 0)} sales.
|
| 160 |
+
- *Revenue This Month:* ${self.kpis.get('revenue_this_month', 0):.2f} from {self.kpis.get('sales_count_this_month', 0)} sales.
|
| 161 |
+
- *Total Revenue All-Time:* ${self.kpis.get('overall_revenue', 0):.2f} from {self.kpis.get('overall_sales_count', 0)} sales.
|
| 162 |
+
"""
|
| 163 |
+
|
| 164 |
+
# --- End of Report Engine ---
|
| 165 |
+
|
| 166 |
def generateResponse(prompt: str) -> str:
|
| 167 |
"""Generate structured JSON response from user input using Generative AI."""
|
| 168 |
if not model:
|
| 169 |
return '{"error": "Model not available"}'
|
|
|
|
|
|
|
|
|
|
| 170 |
system_prompt = """
|
| 171 |
Analyze the user's request for business transaction management. Your goal is to extract structured information about one or more transactions and output it as a valid JSON list.
|
| 172 |
|
|
|
|
| 284 |
all_item_names = [doc.id for doc in all_item_docs]
|
| 285 |
|
| 286 |
if all_item_names:
|
|
|
|
| 287 |
if name_lower in all_item_names:
|
| 288 |
for doc in all_item_docs:
|
| 289 |
if doc.id == name_lower:
|
| 290 |
return {'doc': doc, 'name': name_lower}
|
| 291 |
|
|
|
|
| 292 |
best_match = fuzzy_process.extractOne(name_lower, all_item_names)
|
| 293 |
if best_match and best_match[1] >= 90:
|
| 294 |
matched_name = best_match[0]
|
|
|
|
| 296 |
if doc.id == matched_name:
|
| 297 |
return {'doc': doc, 'name': matched_name}
|
| 298 |
|
|
|
|
| 299 |
singular = p.singular_noun(name_lower)
|
| 300 |
if not singular:
|
| 301 |
singular = name_lower
|
|
|
|
| 304 |
|
| 305 |
|
| 306 |
def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
| 307 |
batch = db.batch()
|
| 308 |
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
|
| 309 |
feedback_messages = []
|
| 310 |
success_count = 0
|
|
|
|
| 311 |
for transaction in transaction_data:
|
| 312 |
details = transaction.get('details', {})
|
| 313 |
item_name = details.get('item') or details.get('service_name')
|
|
|
|
| 314 |
if not item_name:
|
| 315 |
feedback_messages.append("Skipped: An inventory/service update was missing an item or service name.")
|
| 316 |
continue
|
|
|
|
| 317 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 318 |
canonical_name = canonical_info['name']
|
|
|
|
| 319 |
if 'item' in details: details['item'] = canonical_name
|
| 320 |
if 'service_name' in details: details['service_name'] = canonical_name
|
|
|
|
| 321 |
try:
|
| 322 |
change_key = 'quantity' if 'quantity' in details else 'units_available'
|
| 323 |
change_amount = int(details.get(change_key, 0))
|
| 324 |
except (ValueError, TypeError):
|
| 325 |
feedback_messages.append(f"Skipped '{canonical_name}': Invalid quantity or units format.")
|
| 326 |
continue
|
|
|
|
| 327 |
doc_ref = inventory_ref.document(canonical_name)
|
| 328 |
doc_data = {
|
| 329 |
'details': {**details, change_key: firestore.Increment(change_amount)},
|
| 330 |
'type': 'service' if 'service_name' in details else 'good',
|
| 331 |
'last_updated': datetime.now(timezone.utc).isoformat(),
|
| 332 |
}
|
|
|
|
| 333 |
batch.set(doc_ref, doc_data, merge=True)
|
| 334 |
feedback_messages.append(f"Processed '{canonical_name}': change of {change_amount}.")
|
| 335 |
success_count += 1
|
|
|
|
| 336 |
if success_count == 0:
|
| 337 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid inventory/service items to process."
|
|
|
|
| 338 |
try:
|
| 339 |
batch.commit()
|
| 340 |
return True, "\n".join(feedback_messages)
|
|
|
|
| 344 |
|
| 345 |
|
| 346 |
def create_sale(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
|
|
|
|
|
|
|
|
|
|
| 347 |
feedback_messages = []
|
| 348 |
any_success = False
|
|
|
|
| 349 |
for t in transaction_data:
|
| 350 |
details = t.get('details', {})
|
| 351 |
item_name = details.get('item') or details.get('service_name')
|
|
|
|
| 352 |
if not item_name:
|
| 353 |
feedback_messages.append("Sale failed: Missing item or service name.")
|
| 354 |
continue
|
|
|
|
| 355 |
try:
|
| 356 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 357 |
canonical_name = canonical_info['name']
|
|
|
|
| 358 |
last_selling_price = None
|
| 359 |
sales_ref = db.collection("users").document(user_phone).collection("sales")
|
| 360 |
all_sales_query = sales_ref.where('details.item', '==', canonical_name)
|
| 361 |
all_sales_docs = list(all_sales_query.stream())
|
|
|
|
| 362 |
if all_sales_docs:
|
| 363 |
all_sales_docs.sort(key=lambda doc: doc.to_dict().get('timestamp', ''), reverse=True)
|
| 364 |
last_sale_data = all_sales_docs[0].to_dict()
|
| 365 |
last_selling_price = last_sale_data.get('details', {}).get('price')
|
|
|
|
| 366 |
@firestore.transactional
|
| 367 |
def process_one_sale(transaction, sale_details):
|
| 368 |
is_new_item = canonical_info['doc'] is None
|
|
|
|
|
|
|
| 369 |
original_trans_type = t.get('transaction_type')
|
| 370 |
item_type = 'service' if original_trans_type == 'service_offering' else 'good'
|
|
|
|
| 371 |
user_price = sale_details.get('price') or sale_details.get('unit_price')
|
|
|
|
| 372 |
if user_price is not None:
|
| 373 |
selling_price = user_price
|
| 374 |
elif last_selling_price is not None:
|
|
|
|
| 378 |
return f"Sale failed for new item '{canonical_name}': You must specify a price for the first sale."
|
| 379 |
else:
|
| 380 |
selling_price = 0
|
|
|
|
| 381 |
if not isinstance(selling_price, (int, float)): selling_price = 0
|
|
|
|
| 382 |
sale_details['price'] = selling_price
|
| 383 |
sale_details['item'] = canonical_name
|
| 384 |
if 'unit_price' in sale_details: del sale_details['unit_price']
|
| 385 |
if 'service_name' in sale_details: del sale_details['service_name']
|
|
|
|
| 386 |
try:
|
| 387 |
quantity_sold = int(sale_details.get('quantity', 1))
|
| 388 |
if quantity_sold <= 0: return f"Sale failed for '{canonical_name}': Invalid quantity ({quantity_sold})."
|
| 389 |
except (ValueError, TypeError):
|
| 390 |
return f"Sale failed for '{canonical_name}': Invalid quantity format."
|
|
|
|
| 391 |
item_doc_ref = db.collection("users").document(user_phone).collection("inventory_and_services").document(canonical_name)
|
| 392 |
item_snapshot = item_doc_ref.get(transaction=transaction)
|
|
|
|
| 393 |
item_cost = 0
|
| 394 |
if item_snapshot.exists:
|
| 395 |
inv_data = item_snapshot.to_dict()
|
| 396 |
inv_details = inv_data.get('details', {})
|
| 397 |
item_cost = inv_details.get('price') or inv_details.get('unit_price') or 0
|
|
|
|
| 398 |
if inv_data.get('type') == 'good':
|
| 399 |
stock_key = 'quantity'
|
| 400 |
current_stock = int(inv_details.get(stock_key, 0))
|
| 401 |
if current_stock < quantity_sold:
|
| 402 |
return f"Sale failed for '{canonical_name}': Insufficient stock (Have: {current_stock}, Need: {quantity_sold})."
|
|
|
|
| 403 |
transaction.update(item_doc_ref, {f'details.{stock_key}': firestore.Increment(-quantity_sold)})
|
| 404 |
elif item_type == 'good':
|
| 405 |
return f"Sale failed for '{canonical_name}': Item not found in inventory. Please add it first."
|
|
|
|
| 411 |
'last_updated': datetime.now(timezone.utc).isoformat()
|
| 412 |
}
|
| 413 |
transaction.set(item_doc_ref, service_record)
|
|
|
|
| 414 |
sale_doc_ref = sales_ref.document()
|
| 415 |
sale_record = {
|
| 416 |
'details': {**sale_details, 'cost': item_cost},
|
|
|
|
| 418 |
'status': 'completed',
|
| 419 |
'transaction_id': sale_doc_ref.id
|
| 420 |
}
|
|
|
|
| 421 |
transaction.set(sale_doc_ref, sale_record)
|
| 422 |
return f"Sale successful for {quantity_sold} x '{canonical_name}' at {sale_details.get('currency','')}{selling_price} each."
|
|
|
|
| 423 |
transaction_feedback = process_one_sale(db.transaction(), details)
|
| 424 |
feedback_messages.append(transaction_feedback)
|
| 425 |
if "successful" in transaction_feedback:
|
|
|
|
| 427 |
except Exception as e:
|
| 428 |
logger.error(f"Transactional sale failed for '{item_name}': {e}", exc_info=True)
|
| 429 |
feedback_messages.append(f"Sale failed for '{item_name}': An unexpected error occurred.")
|
|
|
|
| 430 |
return any_success, "\n".join(feedback_messages)
|
| 431 |
|
| 432 |
|
|
|
|
| 435 |
expenses_ref = db.collection("users").document(user_phone).collection("expenses")
|
| 436 |
success_count = 0
|
| 437 |
feedback_messages = []
|
|
|
|
| 438 |
for transaction in transaction_data:
|
| 439 |
details = transaction.get('details', {})
|
| 440 |
expense_desc = details.get('description', details.get('category', 'Unnamed Expense'))
|
|
|
|
| 441 |
if 'amount' not in details:
|
| 442 |
feedback_messages.append(f"Skipped expense '{expense_desc}': Missing amount.")
|
| 443 |
continue
|
|
|
|
| 444 |
doc_ref = expenses_ref.document()
|
| 445 |
expense_record = {
|
| 446 |
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
|
|
| 449 |
batch.set(doc_ref, expense_record)
|
| 450 |
feedback_messages.append(f"Recorded expense: '{expense_desc}' for {details.get('currency','')}{details.get('amount')}.")
|
| 451 |
success_count += 1
|
|
|
|
| 452 |
if success_count == 0:
|
| 453 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid expense transactions to create."
|
|
|
|
| 454 |
try:
|
| 455 |
batch.commit()
|
| 456 |
return True, "\n".join(feedback_messages)
|
|
|
|
| 464 |
assets_ref = db.collection("users").document(user_phone).collection("assets")
|
| 465 |
success_count = 0
|
| 466 |
feedback_messages = []
|
|
|
|
| 467 |
for transaction in transaction_data:
|
| 468 |
details = transaction.get('details', {})
|
| 469 |
asset_name = details.get('name', 'Unnamed Asset')
|
|
|
|
| 470 |
if 'value' not in details:
|
| 471 |
feedback_messages.append(f"Skipped asset '{asset_name}': Missing value.")
|
| 472 |
continue
|
|
|
|
| 473 |
doc_ref = assets_ref.document()
|
| 474 |
asset_record = {
|
| 475 |
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
|
|
| 478 |
batch.set(doc_ref, asset_record)
|
| 479 |
feedback_messages.append(f"Recorded asset: '{asset_name}' with value {details.get('currency','')}{details.get('value')}.")
|
| 480 |
success_count += 1
|
|
|
|
| 481 |
if success_count == 0:
|
| 482 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid asset transactions to create."
|
|
|
|
| 483 |
try:
|
| 484 |
batch.commit()
|
| 485 |
return True, "\n".join(feedback_messages)
|
|
|
|
| 493 |
liabilities_ref = db.collection("users").document(user_phone).collection("liabilities")
|
| 494 |
success_count = 0
|
| 495 |
feedback_messages = []
|
|
|
|
| 496 |
for transaction in transaction_data:
|
| 497 |
details = transaction.get('details', {})
|
| 498 |
creditor = details.get('creditor', 'Unnamed Creditor')
|
|
|
|
| 499 |
if 'amount' not in details or not details.get('creditor'):
|
| 500 |
feedback_messages.append(f"Skipped liability '{creditor}': Missing amount or creditor.")
|
| 501 |
continue
|
|
|
|
| 502 |
doc_ref = liabilities_ref.document()
|
| 503 |
liability_record = {
|
| 504 |
'details': details,
|
|
|
|
| 510 |
batch.set(doc_ref, liability_record)
|
| 511 |
feedback_messages.append(f"Recorded liability to '{creditor}' for {details.get('currency','')}{details.get('amount')}.")
|
| 512 |
success_count += 1
|
|
|
|
| 513 |
if success_count == 0:
|
| 514 |
return False, "\n".join(feedback_messages) if feedback_messages else "No valid liability transactions to create."
|
|
|
|
| 515 |
try:
|
| 516 |
batch.commit()
|
| 517 |
return True, "\n".join(feedback_messages)
|
|
|
|
| 521 |
|
| 522 |
|
| 523 |
def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
| 524 |
+
"""Proactively cleans and validates a DataFrame to ensure data integrity."""
|
|
|
|
|
|
|
| 525 |
if df.empty:
|
| 526 |
return df
|
|
|
|
| 527 |
for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
|
| 528 |
if col in df.columns:
|
| 529 |
df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
|
|
|
|
| 530 |
numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available']
|
| 531 |
for col in numeric_cols:
|
| 532 |
if col in df.columns:
|
| 533 |
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
|
|
|
|
| 534 |
for col in df.select_dtypes(include=['object']).columns:
|
| 535 |
df[col] = df[col].fillna('Unknown')
|
|
|
|
| 536 |
return df
|
| 537 |
|
| 538 |
|
| 539 |
def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]:
|
| 540 |
+
"""Fetches all user data, splits/validates DataFrames, and engineers features."""
|
|
|
|
|
|
|
|
|
|
| 541 |
all_dfs_with_names = []
|
|
|
|
| 542 |
inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream()
|
| 543 |
inventory_data, services_data = [], []
|
| 544 |
for doc in inv_serv_docs:
|
|
|
|
| 549 |
services_data.append(flat_data)
|
| 550 |
else:
|
| 551 |
inventory_data.append(flat_data)
|
|
|
|
| 552 |
if inventory_data:
|
| 553 |
inventory_df = pd.DataFrame(inventory_data)
|
| 554 |
all_dfs_with_names.append(("inventory", _validate_dataframe(inventory_df)))
|
|
|
|
| 555 |
if services_data:
|
| 556 |
services_df = pd.DataFrame(services_data)
|
| 557 |
all_dfs_with_names.append(("services", _validate_dataframe(services_df)))
|
|
|
|
| 558 |
collections_to_fetch = {'sales': 'sales', 'expenses': 'expenses', 'assets': 'assets', 'liabilities': 'liabilities'}
|
| 559 |
for df_name, coll_name in collections_to_fetch.items():
|
| 560 |
docs = db.collection("users").document(user_phone).collection(coll_name).stream()
|
| 561 |
data = [doc.to_dict() for doc in docs]
|
|
|
|
| 562 |
if data:
|
| 563 |
flat_data_list = []
|
| 564 |
for item in data:
|
| 565 |
flat_item = {**item, **item.get('details', {})}
|
| 566 |
if 'details' in flat_item: del flat_item['details']
|
| 567 |
flat_data_list.append(flat_item)
|
|
|
|
| 568 |
df = pd.DataFrame(flat_data_list)
|
| 569 |
validated_df = _validate_dataframe(df)
|
| 570 |
+
if df_name == 'sales' and 'price' in validated_df.columns and 'quantity' in validated_df.columns:
|
| 571 |
+
validated_df['sale_total'] = validated_df['price'] * validated_df['quantity']
|
|
|
|
|
|
|
|
|
|
| 572 |
all_dfs_with_names.append((df_name, validated_df))
|
|
|
|
| 573 |
return all_dfs_with_names
|
| 574 |
|
| 575 |
|
| 576 |
def read_datalake(user_phone: str, query: str) -> str:
|
| 577 |
"""
|
| 578 |
+
Handles data queries using a three-tiered hybrid system.
|
| 579 |
+
1. Programmatically calculates KPIs for reliability.
|
| 580 |
+
2. Uses a base LLM to synthesize KPIs into reports for high-level questions.
|
| 581 |
+
3. Uses an augmented PandasAI to answer specific questions, providing KPIs as context.
|
| 582 |
"""
|
|
|
|
| 583 |
try:
|
| 584 |
all_dfs_with_names = _fetch_all_collections_as_dfs(user_phone)
|
| 585 |
if not all_dfs_with_names:
|
| 586 |
return "You have no data recorded yet. Please add some transactions first."
|
| 587 |
|
| 588 |
+
# Tier 1: Programmatic KPI Calculation
|
| 589 |
+
engine = ReportEngine(all_dfs_with_names)
|
| 590 |
+
kpi_context = engine.calculate_all_kpis().format_kpis_for_llm()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 591 |
|
| 592 |
+
# Routing Logic
|
| 593 |
+
report_keywords = ["report", "performance", "summary", "how did i do", "how are things"]
|
| 594 |
+
is_report_request = any(keyword in query.lower() for keyword in report_keywords)
|
|
|
|
| 595 |
|
| 596 |
+
if is_report_request:
|
| 597 |
+
# Tier 2: LLM as an "Insight Generator" (bypassing PandasAI)
|
| 598 |
+
logger.info(f"Handling '{query}' with the Insight Generator.")
|
| 599 |
+
synthesis_prompt = f"""
|
| 600 |
+
You are a business assistant. Based on the following real-time business KPIs, synthesize a concise, friendly report for the user.
|
| 601 |
+
The user's request was: '{query}'.
|
| 602 |
+
Format your entire response using WhatsApp-compatible markdown (*bold*, _italic_).
|
| 603 |
|
| 604 |
+
Here are the KPIs:
|
| 605 |
+
{kpi_context}
|
| 606 |
|
| 607 |
+
Generate a summary based on these numbers.
|
| 608 |
+
"""
|
| 609 |
+
response = llm.invoke(synthesis_prompt)
|
| 610 |
+
return response.content
|
| 611 |
|
|
|
|
|
|
|
| 612 |
else:
|
| 613 |
+
# Tier 3: LLM as a "Code Assistant" (Augmented PandasAI)
|
| 614 |
+
logger.info(f"Handling '{query}' with Augmented PandasAI.")
|
| 615 |
+
augmented_prompt = f"""
|
| 616 |
+
For your context, here are the user's overall business KPIs:
|
| 617 |
+
{kpi_context}
|
| 618 |
+
|
| 619 |
+
Based on this context, please write Python code to answer the following specific user query: '{query}'.
|
| 620 |
+
Your code must end by declaring a `result` dictionary with "type" and "value" keys.
|
| 621 |
+
If the answer is a table, return a dataframe. My system will automatically convert it to an image.
|
| 622 |
+
"""
|
| 623 |
|
| 624 |
+
datalake_dfs = [df for _, df in all_dfs_with_names]
|
| 625 |
+
lake = SmartDatalake(datalake_dfs, config={
|
| 626 |
+
"llm": llm, "response_parser": FlaskResponse,
|
| 627 |
+
"save_charts_path": user_defined_path, "enable_cache": False
|
| 628 |
+
})
|
| 629 |
+
|
| 630 |
+
response = lake.chat(augmented_prompt)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 631 |
return str(response)
|
| 632 |
|
| 633 |
except NoCodeFoundError:
|
| 634 |
+
logger.error(f"PandasAI failed to generate code for query: '{query}'")
|
| 635 |
return "I'm sorry, I couldn't figure out how to answer that question with your data. Please try rephrasing it."
|
| 636 |
except Exception as e:
|
| 637 |
logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
|
|
|
|
| 640 |
|
| 641 |
def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
|
| 642 |
col_ref = db.collection("users").document(user_phone).collection(collection_name)
|
|
|
|
| 643 |
if 'transaction_id' in details and details['transaction_id']:
|
| 644 |
doc = col_ref.document(details['transaction_id']).get()
|
| 645 |
if doc.exists: return {"id": doc.id, "data": doc.to_dict()}
|
|
|
|
| 646 |
if collection_name in ['inventory_and_services', 'sales'] and ('item' in details or 'service_name' in details):
|
| 647 |
item_name = details.get('item') or details.get('service_name')
|
| 648 |
canonical_info = _get_canonical_info(user_phone, item_name)
|
| 649 |
if canonical_info['doc']:
|
| 650 |
doc = canonical_info['doc']
|
| 651 |
return {"id": doc.id, "data": doc.to_dict()}
|
|
|
|
| 652 |
query = col_ref
|
| 653 |
key_map = {'expenses': 'description', 'assets': 'name', 'liabilities': 'creditor'}
|
| 654 |
search_key = key_map.get(collection_name)
|
|
|
|
| 655 |
filters_applied = False
|
| 656 |
if search_key and search_key in details:
|
| 657 |
query = query.where(f'details.{search_key}', '==', details[search_key]); filters_applied = True
|
|
|
|
| 659 |
query = query.where('details.amount', '==', details['amount']); filters_applied = True
|
| 660 |
if 'value' in details:
|
| 661 |
query = query.where('details.value', '==', details['value']); filters_applied = True
|
|
|
|
| 662 |
if not filters_applied: return None
|
|
|
|
| 663 |
docs = query.limit(2).stream()
|
| 664 |
found_docs = [{"id": doc.id, "data": doc.to_dict()} for doc in docs]
|
|
|
|
| 665 |
if len(found_docs) == 1: return found_docs[0]
|
| 666 |
elif len(found_docs) > 1: return "multiple_matches"
|
| 667 |
else: return None
|
|
|
|
| 678 |
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities'
|
| 679 |
}
|
| 680 |
collection_name = collection_map.get(trans_type)
|
|
|
|
| 681 |
if not collection_name:
|
| 682 |
feedback.append(f"Update skipped: Unknown type '{trans_type}'.")
|
| 683 |
continue
|
|
|
|
| 684 |
target_doc = _find_document_by_details(user_phone, collection_name, details)
|
| 685 |
if target_doc == "multiple_matches":
|
| 686 |
feedback.append(f"Update for {trans_type} failed: Multiple records match. Please be more specific.")
|
|
|
|
| 688 |
if not target_doc:
|
| 689 |
feedback.append(f"Update for {trans_type} failed: No record found matching your description.")
|
| 690 |
continue
|
|
|
|
| 691 |
doc_id = target_doc["id"]
|
| 692 |
doc_ref = db.collection("users").document(user_phone).collection(collection_name).document(doc_id)
|
|
|
|
| 693 |
try:
|
| 694 |
updates = {f"details.{k}": v for k, v in details.items() if k != 'transaction_id'}
|
| 695 |
if not updates:
|
| 696 |
feedback.append(f"Update for {trans_type} (ID: {doc_id}) skipped: No new data provided.")
|
| 697 |
continue
|
|
|
|
| 698 |
updates['last_updated'] = datetime.now(timezone.utc).isoformat()
|
| 699 |
doc_ref.update(updates)
|
| 700 |
feedback.append(f"Successfully updated {trans_type} record (ID: {doc_id}).")
|
|
|
|
| 702 |
except Exception as e:
|
| 703 |
logger.error(f"Update failed for doc '{doc_id}': {e}", exc_info=True)
|
| 704 |
feedback.append(f"Update for {trans_type} (ID: {doc_id}) failed with an error.")
|
|
|
|
| 705 |
return any_success, "\n".join(feedback)
|
| 706 |
|
| 707 |
|
|
|
|
| 716 |
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities'
|
| 717 |
}
|
| 718 |
collection_name = collection_map.get(trans_type)
|
|
|
|
| 719 |
if not collection_name:
|
| 720 |
feedback.append(f"Delete skipped: Unknown type '{trans_type}'.")
|
| 721 |
continue
|
|
|
|
| 722 |
target_doc = _find_document_by_details(user_phone, collection_name, details)
|
| 723 |
if target_doc == "multiple_matches":
|
| 724 |
feedback.append(f"Delete for {trans_type} failed: Multiple records match.")
|
|
|
|
| 726 |
if not target_doc:
|
| 727 |
feedback.append(f"Delete for {trans_type} failed: No record found.")
|
| 728 |
continue
|
|
|
|
| 729 |
doc_id = target_doc["id"]
|
| 730 |
try:
|
| 731 |
db.collection("users").document(user_phone).collection(collection_name).document(doc_id).delete()
|
|
|
|
| 734 |
except Exception as e:
|
| 735 |
logger.error(f"Delete failed for doc '{doc_id}': {e}", exc_info=True)
|
| 736 |
feedback.append(f"Delete for {trans_type} (ID: {doc_id}) failed with an error.")
|
|
|
|
| 737 |
return any_success, "\n".join(feedback)
|
| 738 |
|
| 739 |
|
|
|
|
| 747 |
logger.error(f"Failed to persist temporary transaction for user {mobile}: {e}", exc_info=True)
|
| 748 |
return False
|
| 749 |
|
| 750 |
+
|
| 751 |
CURRENCY_SYMBOL_REGEX = re.compile(r"^\s*[\$\£\€\¥\₹R]")
|
| 752 |
|
| 753 |
def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str:
|
| 754 |
if not transactions: return "No transaction data to display."
|
| 755 |
if isinstance(transactions, dict): transactions = [transactions]
|
|
|
|
| 756 |
output_lines = []
|
| 757 |
for idx, trans in enumerate(transactions):
|
| 758 |
if not isinstance(trans, dict): continue
|
|
|
|
| 759 |
details = trans.get('details', trans)
|
| 760 |
trans_type = trans.get('transaction_type', 'Item').replace("_", " ").title()
|
| 761 |
title = f"{trans_type}"
|
| 762 |
if len(transactions) > 1: output_lines.append(f"--- {title} {idx + 1} ---")
|
| 763 |
else: output_lines.append(f"--- {title} ---")
|
|
|
|
| 764 |
key_order = [
|
| 765 |
'transaction_id', 'item', 'service_name', 'name', 'creditor', 'category',
|
| 766 |
'quantity', 'units_available', 'hours', 'price', 'rate', 'amount', 'cost', 'value',
|
| 767 |
'customer', 'vendor', 'client', 'date', 'acquisition_date', 'due_date', 'description', 'type'
|
| 768 |
]
|
|
|
|
| 769 |
displayed_keys = set()
|
| 770 |
if 'transaction_id' in trans:
|
| 771 |
output_lines.append(f"• Transaction ID: {trans['transaction_id']}")
|
| 772 |
displayed_keys.add('transaction_id')
|
|
|
|
| 773 |
for key in key_order:
|
| 774 |
if key in details and key not in displayed_keys:
|
| 775 |
output_lines.append(f"• {key.replace('_', ' ').title()}: {details[key]}")
|
| 776 |
displayed_keys.add(key)
|
|
|
|
| 777 |
for key, value in details.items():
|
| 778 |
if key not in displayed_keys and key != 'currency':
|
| 779 |
output_lines.append(f"• {key.replace('_', ' ').title()}: {value}")
|
|
|
|
| 797 |
"""
|
| 798 |
if not parsed_trans_data:
|
| 799 |
return "I couldn't understand the transaction details. Could you please try again?"
|
|
|
|
| 800 |
grouped_transactions = {}
|
| 801 |
for trans in parsed_trans_data:
|
| 802 |
intent = trans.get('intent', 'unknown').lower()
|
|
|
|
| 804 |
key = (intent, trans_type)
|
| 805 |
if key not in grouped_transactions: grouped_transactions[key] = []
|
| 806 |
grouped_transactions[key].append(trans)
|
|
|
|
| 807 |
final_feedback = []
|
| 808 |
for (intent, trans_type), transactions in grouped_transactions.items():
|
| 809 |
logger.info(f"Processing group: {intent} - {trans_type} for user {mobile}")
|
|
|
|
| 844 |
except Exception as e:
|
| 845 |
logger.error(f"Error processing group ({intent}, {trans_type}) for user {mobile}: {e}", exc_info=True)
|
| 846 |
final_feedback.append(f"An unexpected error occurred while processing your {trans_type} {intent} request.")
|
|
|
|
| 847 |
if not final_feedback: return "No actions were processed from your request."
|
| 848 |
return "\n\n".join(final_feedback).strip()
|