smart-w / utility.py
rairo's picture
Update utility.py
5902637 verified
import json
import os
import logging
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Union, Optional, Any, Tuple
from google.cloud import firestore
import pandas as pd
import inflect
from thefuzz import process as fuzzy_process
from pandasai import SmartDatalake
from pandasai.responses.response_parser import ResponseParser
from pandasai.exceptions import NoCodeFoundError, MaliciousQueryError
from langchain_google_genai import ChatGoogleGenerativeAI
import google.generativeai as genai
import re
import uuid
import dataframe_image as dfi
from PIL import Image
import io
logger = logging.getLogger(__name__)
import firebase_admin
from firebase_admin import credentials, firestore
# --- Initialize supporting libraries ---
p = inflect.engine()
def init_firestore_from_env(env_var: str = "FIREBASE"):
"""
Initialise firebase-admin with the service-account JSON that is stored
in the given environment variable and return a Firestore client.
"""
try:
if firebase_admin._apps:
return firestore.client()
sa_json = os.environ[env_var]
sa_info = json.loads(sa_json)
cred = credentials.Certificate(sa_info)
# FIX: Include storageBucket in initialization for audio handling
bucket_name = os.environ.get("FIREBASE_STORAGE_BUCKET")
firebase_admin.initialize_app(cred, {
'storageBucket': bucket_name
})
return firestore.client()
except KeyError:
logging.error("%s environment variable is not set", env_var)
raise
except (json.JSONDecodeError, ValueError) as e:
logging.error("Invalid service-account JSON in %s: %s", env_var, e)
raise
except Exception as e:
logging.exception("Failed to initialise Firestore: %s", e)
raise
db = init_firestore_from_env()
guid = uuid.uuid4()
new_filename = f"{guid}"
user_defined_path = os.path.join(os.getcwd(), "plots")
os.makedirs(user_defined_path, exist_ok=True)
def render_df_as_image(df: pd.DataFrame) -> Optional[str]:
"""Renders a Pandas DataFrame as an image and returns the file path."""
if df.empty:
return "The data requested is empty."
try:
max_cols = 10
if len(df.columns) > max_cols:
df = df.iloc[:, :max_cols]
img_path = os.path.join(user_defined_path, f"report_{uuid.uuid4()}.png")
dfi.export(df, img_path, table_conversion='matplotlib', dpi=200)
return img_path
except Exception as e:
logger.error(f"Failed to render DataFrame as image: {e}")
return "There was an error creating the data table image."
class FlaskResponse(ResponseParser):
def __init__(self, context) -> None:
super().__init__(context)
def format_dataframe(self, result):
df = result['value']
return render_df_as_image(df)
def format_plot(self, result):
try:
img_path = result['value']
except ValueError:
img_path = str(result['value'])
return img_path
def format_other(self, result):
return result['value']
# --- AI Model Configuration ---
try:
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
GENERATIVE_MODEL_NAME = "gemini-2.0-flash"
VISION_MODEL_NAME = "gemini-2.0-flash"
# Increased token limit to 8192 to prevent JSON truncation on long lists
model = genai.GenerativeModel(
GENERATIVE_MODEL_NAME,
generation_config={
"temperature": 0.1,
"top_p": 0.9,
"top_k": 10,
"max_output_tokens": 8192,
}
)
vision_model = genai.GenerativeModel(VISION_MODEL_NAME)
llm = ChatGoogleGenerativeAI(
model=GENERATIVE_MODEL_NAME,
temperature=0.1,
convert_system_message_to_human=True
)
logger.info(f"Using Generative Models: {GENERATIVE_MODEL_NAME} (Text) and {VISION_MODEL_NAME} (Vision)")
except KeyError:
logger.error("GOOGLE_API_KEY environment variable not set!")
model = vision_model = llm = None
except Exception as e:
logger.error(f"Error configuring Generative AI: {e}", exc_info=True)
model = vision_model = llm = None
# --- START: LANGUAGE PROCESSING (UPGRADE 4) ---
def detect_and_translate_input(text: str) -> Dict[str, str]:
"""
Detects language. If not English, translates to English preserving numbers/entities.
Returns: {'english_text': str, 'detected_lang': str}
"""
if not model:
return {'english_text': text, 'detected_lang': 'English'}
system_prompt = """
You are a language detection and translation engine for a bookkeeping bot.
1. Detect the language of the user's input (e.g., English, Shona, Zulu, Ndebele, Tswana, etc.).
2. CRITICAL: Do NOT be misled by Proper Nouns or Names (e.g., "Chiedza", "Musa", "Farai").
Focus on verbs and sentence structure. If the grammatical structure is English, return "English" as the language regardless of local names used.
3. If the language is NOT English, translate the text accurately to English.
4. IMPORTANT: Preserve all numbers, currency symbols ($, R, ZIG), and item names exactly.
5. If the language IS English, return the text as is.
6. Return ONLY a valid JSON object.
Output Schema:
{
"detected_lang": "Name of Language",
"english_text": "The translated text or original text"
}
"""
try:
response = model.generate_content([system_prompt, text])
cleaned_response = re.sub(r'^```json\s*|\s*```$', '', response.text, flags=re.MULTILINE).strip()
result = json.loads(cleaned_response)
return result
except Exception as e:
logger.error(f"Language detection failed: {e}")
return {'english_text': text, 'detected_lang': 'English'}
def translate_output(text: str, target_lang: str) -> str:
"""Translates the system's English response back to the target language."""
if not model or not target_lang or target_lang.lower() == 'english':
return text
prompt = f"""
Translate the following bookkeeping response from English to {target_lang}.
Rules:
- Keep numbers, transaction IDs, and currency symbols (e.g., $10.00, R50) UNCHANGED.
- Keep formatting like markdown (*bold*) UNCHANGED.
- Maintain a professional, helpful tone.
Text to translate:
"{text}"
"""
try:
response = model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Output translation failed: {e}")
return text
# --- END: LANGUAGE PROCESSING ---
# --- START: VISION PROCESSING FUNCTIONS ---
def _transpile_vision_json_to_query(vision_json: List[Dict]) -> str:
if not vision_json:
return "Error: Could not extract any transactions from the image."
query_parts = []
for trans in vision_json:
details = trans.get("details", {})
trans_type = trans.get("transaction_type", "unknown")
intent = trans.get("intent", "record")
part = f"{intent.title()} a {trans_type}"
item = details.get("item") or details.get("name") or details.get("description") or details.get("service_name")
quantity = details.get("quantity")
price = details.get("price") or details.get("amount") or details.get("value") or details.get("unit_price")
currency = details.get("currency", "")
vendor = details.get("vendor") or details.get("creditor")
if quantity and item:
part += f" of {quantity} '{item}'"
elif item:
part += f" for '{item}'"
if price:
part += f" for {currency}{price}"
if vendor:
part += f" from {vendor}"
query_parts.append(part)
final_query = " and ".join(query_parts)
return final_query.strip()
def _analyze_image_with_vision(image_bytes: bytes, caption: Optional[str]) -> List[Dict]:
"""Sends the image to the Gemini Vision model and returns a structured JSON list of transactions."""
if not vision_model:
return []
try:
image_pil = Image.open(io.BytesIO(image_bytes))
prompt = f"""
You are a bookkeeping vision model. Analyze the image. Return ONLY a valid JSON array [] of transaction objects.
**CRITICAL CLASSIFICATION RULES:**
1. **RECEIPTS / INVOICES / BILLS:**
- These are money LEAVING the business.
- Classify as **"expense"** (for utilities, rent, etc.) or **"purchase"** (for buying stock/inventory).
- Do NOT classify a receipt as a "sale".
2. **PRODUCT PHOTOS / SHELVES:**
- If it is a photo of an item with no text/receipt, OR a photo of a customer holding an item.
- Classify as **"sale"** (money COMING IN).
3. **CATALOGS / MENUS:**
- Classify as **"service_offering"** or **"inventory"** (creating items in system).
**USER CONTEXT:**
{caption if caption else "None"}
====================
SCHEMA (each object)
====================
{{
"intent": "create" | "read" | "update" | "delete",
"transaction_type": "sale" | "purchase" | "expense" | "asset" | "liability" | "inventory" | "service_offering" | "query",
"details": {{ /* keys below */ }},
"source": "ocr" | "object_detection"
}}
================
DETAILS KEYS
================
- sale (goods): item, quantity, price, currency
- expense: description, amount, currency, vendor
- purchase: item, quantity, price, currency
- asset: name, value, currency
- liability: creditor, amount, currency
- query: query (verbatim text)
Analyze the provided image and return only the JSON list.
"""
response = vision_model.generate_content([prompt, image_pil])
response_text = response.text
json_str = re.search(r'\[.*\]', response_text, re.DOTALL)
if json_str:
return json.loads(json_str.group(0))
else:
logger.error(f"Vision AI did not return a valid JSON list. Raw response: {response_text}")
return []
except Exception as e:
logger.error(f"Error in Vision AI processing: {e}", exc_info=True)
return []
def process_image_and_generate_query(image_bytes: bytes, caption: Optional[str]) -> str:
"""Master function to process an image and generate a natural language query."""
logger.info("Starting image analysis with Vision AI.")
vision_json_list = _analyze_image_with_vision(image_bytes, caption)
if not vision_json_list:
return "Error: I couldn't find any actionable transactions in the image."
logger.info(f"Vision AI analysis complete. Result: {vision_json_list}")
return _transpile_vision_json_to_query(vision_json_list)
# --- END: VISION PROCESSING FUNCTIONS ---
class ReportEngine:
def __init__(self, dfs_with_names: List[Tuple[str, pd.DataFrame]], query: str):
self.dfs = {name: df for name, df in dfs_with_names}
self.query = query.lower()
self.now = datetime.now(timezone.utc)
self.results = {}
self.currency = self._get_user_currency()
def _get_user_currency(self) -> str:
"""Determines the user's primary currency from their data."""
for df_name in ['sales', 'expenses', 'assets', 'liabilities']:
if df_name in self.dfs and 'details' in self.dfs[df_name].columns:
currency_series = self.dfs[df_name]['details'].str.get('currency')
mode = currency_series.dropna().mode()
if not mode.empty:
primary_currency = mode[0]
if isinstance(primary_currency, str) and primary_currency.strip() and primary_currency != 'Unknown':
return primary_currency
return "R"
def _get_time_filter(self, target_df: pd.DataFrame) -> Optional[pd.Series]:
if target_df is None or 'timestamp' not in target_df.columns or target_df.empty:
return None
if "yesterday" in self.query:
yesterday = (self.now - timedelta(days=1)).date()
start_of_yesterday = pd.Timestamp(yesterday, tz='UTC')
end_of_yesterday = start_of_yesterday + timedelta(days=1)
return (target_df['timestamp'] >= start_of_yesterday) & (target_df['timestamp'] < end_of_yesterday)
if "today" in self.query:
today = self.now.date()
start_of_today = pd.Timestamp(today, tz='UTC')
end_of_today = start_of_today + timedelta(days=1)
return (target_df['timestamp'] >= start_of_today) & (target_df['timestamp'] < end_of_today)
if "last month" in self.query:
first_day_current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
last_day_last_month = first_day_current_month - timedelta(days=1)
first_day_last_month = last_day_last_month.replace(day=1)
return (target_df['timestamp'] >= pd.Timestamp(first_day_last_month)) & (target_df['timestamp'] < pd.Timestamp(first_day_current_month))
month_match = re.search(r"\b(january|february|march|april|may|june|july|august|september|october|november|december)\b", self.query)
if month_match:
month_name = month_match.group(1)
month_number = datetime.strptime(month_name.title(), "%B").month
current_year = self.now.year
return (target_df['timestamp'].dt.month == month_number) & (target_df['timestamp'].dt.year == current_year)
if "this month" in self.query or "month" in self.query:
return (target_df['timestamp'].dt.month == self.now.month) & (target_df['timestamp'].dt.year == self.now.year)
if "last week" in self.query:
start_of_this_week = self.now.date() - timedelta(days=self.now.weekday())
start_of_last_week = start_of_this_week - timedelta(days=7)
return (target_df['timestamp'].dt.date >= start_of_last_week) & (target_df['timestamp'].dt.date < start_of_this_week)
if "this week" in self.query or "week" in self.query:
start_of_week = self.now.date() - timedelta(days=self.now.weekday())
return target_df['timestamp'].dt.date >= start_of_week
day_match = re.search(r"on (monday|tuesday|wednesday|thursday|friday|saturday|sunday)", self.query)
if day_match and 'day_of_week' in target_df.columns:
day_name = day_match.group(1).title()
return target_df['day_of_week'] == day_name
if "year" in self.query:
return target_df['timestamp'].dt.year == self.now.year
return pd.Series(True, index=target_df.index)
def generate_report(self) -> str:
"""Generates a simple Sales or Expenses report."""
subject = "sales"
expense_triggers = ["expense report", "expense summary", "expenses"]
if any(keyword in self.query for keyword in expense_triggers):
subject = "expenses"
target_df = self.dfs.get(subject, pd.DataFrame())
if target_df.empty:
return json.dumps({"error": f"I couldn't find any {subject} data to generate a report."})
time_filter = self._get_time_filter(target_df)
if time_filter is not None:
target_df = target_df[time_filter]
if target_df.empty:
return json.dumps({"error": f"No {subject} data found for the specified period."})
if subject == "sales":
total_revenue = target_df['sale_total'].sum()
num_transactions = len(target_df)
item_summary = target_df.groupby('item')['quantity'].sum()
if item_summary.empty:
best_selling_item = "N/A"
worst_selling_item = "N/A"
elif len(item_summary) == 1:
best_selling_item = item_summary.idxmax()
worst_selling_item = "N/A (Only one item type sold)"
else:
max_val = item_summary.max()
min_val = item_summary.min()
if max_val == min_val:
best_selling_item = "Tie (All items sold equally)"
worst_selling_item = "Tie"
else:
best_selling_item = item_summary.idxmax()
worst_selling_item = item_summary.idxmin()
self.results = {
"report_subject": "Sales",
"total_revenue": f"{self.currency}{total_revenue:.2f}",
"number_of_sales": num_transactions,
"best_selling_item": best_selling_item,
"worst_selling_item": worst_selling_item
}
else: # expenses
total_expenses = target_df['amount'].sum()
num_transactions = len(target_df)
category_summary = target_df.groupby('description')['amount'].sum()
highest_expense_category = category_summary.idxmax() if not category_summary.empty else "N/A"
self.results = {
"report_subject": "Expenses",
"total_expenses": f"{self.currency}{total_expenses:.2f}",
"number_of_expenses": num_transactions,
"highest_expense_category": highest_expense_category
}
return json.dumps(self.results, indent=2)
def generate_debt_report(self) -> str:
"""Generates a report of outstanding debts (Upgrade 5)."""
sales_df = self.dfs.get('sales', pd.DataFrame())
if sales_df.empty: return json.dumps({"error": "No sales data found."})
# Check for outstanding amounts or partial/pending status
if 'amount_outstanding' not in sales_df.columns:
return json.dumps({"message": "No debt records found."})
debt_df = sales_df[sales_df['amount_outstanding'] > 0]
if debt_df.empty:
return json.dumps({"message": "Great news! No pending debts found."})
total_outstanding = debt_df['amount_outstanding'].sum()
# Group by customer
debtors = []
if 'customer' in debt_df.columns:
summary = debt_df.groupby('customer')['amount_outstanding'].sum().reset_index()
for _, row in summary.iterrows():
debtors.append(f"{row['customer']}: {self.currency}{row['amount_outstanding']:.2f}")
else:
debtors.append("Various customers (names not recorded)")
self.results = {
"report_subject": "Debtors List",
"total_outstanding": f"{self.currency}{total_outstanding:.2f}",
"debtors": debtors
}
return json.dumps(self.results, indent=2)
def generate_profit_report(self) -> str:
"""Generates a comprehensive profitability report."""
sales_df = self.dfs.get('sales', pd.DataFrame())
expenses_df = self.dfs.get('expenses', pd.DataFrame())
time_filter_sales = self._get_time_filter(sales_df)
time_filter_expenses = self._get_time_filter(expenses_df)
filtered_sales = sales_df[time_filter_sales] if time_filter_sales is not None else sales_df
filtered_expenses = expenses_df[time_filter_expenses] if time_filter_expenses is not None else expenses_df
total_revenue = filtered_sales['sale_total'].sum() if not filtered_sales.empty else 0
total_cogs = filtered_sales['cogs'].sum() if not filtered_sales.empty and 'cogs' in filtered_sales.columns else 0
total_expenses = filtered_expenses['amount'].sum() if not filtered_expenses.empty else 0
gross_profit = total_revenue - total_cogs
net_profit = gross_profit - total_expenses
num_sales = len(filtered_sales) if not filtered_sales.empty else 0
total_items_sold = filtered_sales['quantity'].sum() if not filtered_sales.empty else 0
atv = total_revenue / num_sales if num_sales > 0 else 0
ipt = total_items_sold / num_sales if num_sales > 0 else 0
expense_ratio = (total_expenses / total_revenue) * 100 if total_revenue > 0 else 0
most_profitable_item = "N/A"
if not filtered_sales.empty and 'cogs' in filtered_sales.columns:
filtered_sales_copy = filtered_sales.copy()
filtered_sales_copy['item_profit'] = filtered_sales_copy['sale_total'] - filtered_sales_copy['cogs']
item_profitability = filtered_sales_copy.groupby('item')['item_profit'].sum()
if not item_profitability.empty:
most_profitable_item = item_profitability.idxmax()
self.results = {
"report_subject": "Profitability",
"total_revenue": f"{self.currency}{total_revenue:.2f}", "total_cogs": f"{self.currency}{total_cogs:.2f}",
"gross_profit": f"{self.currency}{gross_profit:.2f}", "total_expenses": f"{self.currency}{total_expenses:.2f}",
"net_profit": f"{self.currency}{net_profit:.2f}", "average_transaction_value": f"{self.currency}{atv:.2f}",
"items_per_transaction": f"{ipt:.2f}", "expense_to_revenue_ratio": f"{expense_ratio:.2f}%",
"most_profitable_item": most_profitable_item
}
return json.dumps(self.results, indent=2)
def generate_item_report(self, subject_item: str) -> str:
"""Generates a performance report for a specific item."""
sales_df = self.dfs.get('sales', pd.DataFrame())
if sales_df.empty: return json.dumps({"error": f"No sales data found for '{subject_item}'."})
item_df = sales_df[sales_df['item'].str.contains(subject_item, case=False, na=False)]
if item_df.empty: return json.dumps({"error": f"I couldn't find any sales for '{subject_item}'."})
time_filter = self._get_time_filter(item_df)
filtered_df = item_df[time_filter] if time_filter is not None else item_df
if filtered_df.empty: return json.dumps({"error": f"No data for '{subject_item}' in this period."})
units_sold = filtered_df['quantity'].sum()
total_revenue = filtered_df['sale_total'].sum()
total_cogs = filtered_df['cogs'].sum() if 'cogs' in filtered_df.columns else 0
gross_profit = total_revenue - total_cogs
profit_margin = (gross_profit / total_revenue) * 100 if total_revenue > 0 else 0
avg_price = total_revenue / units_sold if units_sold > 0 else 0
self.results = {
"report_subject": "Item Report", "item_name": subject_item,
"units_sold": int(units_sold), "total_revenue": f"{self.currency}{total_revenue:.2f}",
"total_cogs": f"{self.currency}{total_cogs:.2f}", "gross_profit": f"{self.currency}{gross_profit:.2f}",
"profit_margin": f"{profit_margin:.2f}%", "average_selling_price": f"{self.currency}{avg_price:.2f}"
}
return json.dumps(self.results, indent=2)
def generate_day_of_week_report(self) -> str:
sales_df = self.dfs.get('sales', pd.DataFrame())
if sales_df.empty or 'day_of_week' not in sales_df.columns: return json.dumps({"error": "No data available to analyze by day."})
time_filter = self._get_time_filter(sales_df)
filtered_df = sales_df[time_filter] if time_filter is not None else sales_df
if filtered_df.empty: return json.dumps({"error": "No sales data in this period."})
daily_sales = filtered_df.groupby('day_of_week')['sale_total'].sum()
if daily_sales.empty: return json.dumps({"error": "No sales to analyze by day in this period."})
best_day = daily_sales.idxmax()
day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
daily_sales = daily_sales.reindex(day_order).fillna(0)
self.results = {
"report_subject": "Day of Week Analysis",
"best_day": best_day,
"daily_sales_breakdown": {day: f"{self.currency}{amount:.2f}" for day, amount in daily_sales.to_dict().items()}
}
return json.dumps(self.results, indent=2)
def generate_forecast_data(self) -> str:
sales_df = self.dfs.get('sales')
if sales_df is None or sales_df.empty:
return json.dumps({"error": "Not enough sales data to generate a forecast."})
sales_df_copy = sales_df.copy()
sales_df_copy.set_index('timestamp', inplace=True)
weekly_sales = sales_df_copy['sale_total'].resample('W').sum()
if len(weekly_sales) < 2:
return json.dumps({"error": "I need at least two weeks of sales data to make a forecast."})
last_week_sales = weekly_sales.iloc[-1]
previous_week_sales = weekly_sales.iloc[-2] if len(weekly_sales) > 1 else 0
growth_rate = 0
if previous_week_sales > 0:
growth_rate = ((last_week_sales - previous_week_sales) / previous_week_sales) * 100
historical_avg = weekly_sales.head(-1).mean()
self.results = {
"last_period_sales": f"{self.currency}{last_week_sales:.2f}",
"previous_period_sales": f"{self.currency}{previous_week_sales:.2f}",
"period_over_period_growth": f"{growth_rate:.2f}%",
"historical_average": f"{self.currency}{historical_avg:.2f}"
}
return json.dumps(self.results, indent=2)
def generate_business_snapshot(self) -> Dict[str, Any]:
snapshot = {}
sales_df = self.dfs.get('sales', pd.DataFrame())
expenses_df = self.dfs.get('expenses', pd.DataFrame())
total_revenue = sales_df['sale_total'].sum() if not sales_df.empty else 0
total_expenses = expenses_df['amount'].sum() if not expenses_df.empty else 0
net_profit = total_revenue - total_expenses
snapshot['financial_kpis'] = {
"User's Primary Currency": self.currency,
"Total Revenue": f"{self.currency}{total_revenue:.2f}",
"Total Expenses": f"{self.currency}{total_expenses:.2f}",
"Net Profit": f"{self.currency}{net_profit:.2f}"
}
inventory_df = self.dfs.get('inventory', pd.DataFrame())
if not inventory_df.empty and 'item' in inventory_df.columns and 'quantity' in inventory_df.columns:
snapshot['inventory_overview'] = "\n".join(
[f"- {row['item']} ({int(row['quantity'])} units)" for _, row in inventory_df.iterrows()]
)
else:
snapshot['inventory_overview'] = "No inventory items recorded."
assets_df = self.dfs.get('assets', pd.DataFrame())
if not assets_df.empty and 'name' in assets_df.columns and 'value' in assets_df.columns:
snapshot['asset_register'] = "\n".join(
[f"- {row['name']} ({self.currency}{row['value']:.2f})" for _, row in assets_df.iterrows()]
)
else:
snapshot['asset_register'] = "No assets recorded."
liabilities_df = self.dfs.get('liabilities', pd.DataFrame())
if not liabilities_df.empty and 'creditor' in liabilities_df.columns and 'amount' in liabilities_df.columns:
snapshot['liabilities_ledger'] = "\n".join(
[f"- {row['creditor']} ({self.currency}{row['amount']:.2f})" for _, row in liabilities_df.iterrows()]
)
else:
snapshot['liabilities_ledger'] = "No liabilities recorded."
return snapshot
def generateResponse(prompt: str, currency: str = "R") -> str:
"""Generate structured JSON response from user input using Generative AI."""
if not model:
return '{"error": "Model not available"}'
# --- UPDATED: System Prompt for Upgrades 1, 2, 3, 5 ---
system_prompt = f"""
Analyze the user's request for business transaction management. Your goal is to extract structured information about one or more transactions and output it as a valid JSON list.
**USER CONTEXT:**
- **Default Currency:** {currency} (Use this currency code if no symbol is provided in the input).
**1. Output Format:**
You MUST output your response as a valid JSON list `[]` containing one or more transaction objects `{{}}`.
**2. Transaction Object Structure:**
Each transaction object MUST have the following keys:
- `"intent"`: The user's goal ("create", "read", "update", "delete", "reset_account").
- `"transaction_type"`: The category of the transaction (e.g., "sale", "purchase", "inventory", "expense", "asset", "liability", "query", "service_offering", "account").
- `"details"`: An object containing key-value pairs extracted from the request.
**3. Key Naming Conventions for the `details` Object:**
- **For Expenses:** Use `"amount"`, `"description"`, and `"category"`.
- **For Assets:** Use `"value"` for the monetary worth and `"name"` for the item's name.
- **For Liabilities:** Use `"amount"` and `"creditor"`.
- **For Sales/Inventory (Updated):**
- Use `"item"`, `"quantity"`, and `"price"`.
- **Customer:** If a customer name is mentioned (e.g., "Sold to John"), extract as `"customer"`.
- **Payment:** If the user mentions how much was paid (e.g., "They gave me $10" or "paid 500"), extract as `"amount_paid"`.
- **For Updates/Deletes (Updated):**
- If a Transaction ID is mentioned (e.g., "update transaction 8f3a..."), extract it as `"transaction_id"`.
- **For all financial transactions:**
- **Currency:** ONLY include a `"currency"` key if the user explicitly types a symbol (e.g., $, R, ZAR, USD). **If the user does not type a symbol, DO NOT include the currency key.** This allows the system to inherit it from inventory.
**4. Special Intents:**
- **Account Reset:** If the user explicitly asks to "reset my account", "wipe data", or "delete all data", use intent `"reset_account"` and transaction_type `"account"`.
**5. Examples:**
**Example 1: Sales with Change**
- **Input:** "Sold 2 bread for $1 each, customer gave me $5"
- **Output:** [ {{"intent": "create", "transaction_type": "sale", "details": {{"item": "bread", "quantity": 2, "price": 1, "amount_paid": 5, "currency": "$"}} }} ]
**Example 2: Update with ID**
- **Input:** "Update transaction abc1234, change price to 50"
- **Output:** [ {{"intent": "update", "transaction_type": "sale", "details": {{"transaction_id": "abc1234", "price": 50}} }} ]
**Example 3: Reset Account**
- **Input:** "Reset my account completely"
- **Output:** [ {{"intent": "reset_account", "transaction_type": "account", "details": {{}} }} ]
"""
try:
full_prompt = [system_prompt, prompt]
response = model.generate_content(full_prompt)
response_text = response.text
cleaned_response = re.sub(r'^```json\s*|\s*```$', '', response_text, flags=re.MULTILINE).strip()
if cleaned_response.startswith('{') and cleaned_response.endswith('}'):
return f"[{cleaned_response}]"
return cleaned_response
except Exception as e:
logger.error(f"LLM Response generation failed: {e}", exc_info=True)
return '{"error": "Failed to process request with LLM"}'
def parse_multiple_transactions(response_text: str) -> List[Dict]:
"""Parse JSON response from LLM into structured transactions."""
try:
parsed_data = json.loads(response_text)
if isinstance(parsed_data, list):
return [add_timestamp(item) for item in parsed_data if isinstance(item, dict)]
elif isinstance(parsed_data, dict):
return [add_timestamp(parsed_data)]
return []
except json.JSONDecodeError:
logger.error(f"Failed to decode JSON from LLM response: {response_text}")
return []
except Exception as e:
logger.error(f"Unexpected error during parsing: {e}", exc_info=True)
return []
def add_timestamp(transaction: Dict) -> Dict:
"""Add created_at timestamp to transaction if not present."""
if 'created_at' not in transaction:
transaction['created_at'] = datetime.now(timezone.utc).isoformat()
return transaction
def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]:
"""Finds the canonical version of an item using an "exact match first" hybrid approach."""
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
name_lower = item_name.lower().strip()
if name_lower.endswith('ss'):
singular = name_lower
else:
singular = p.singular_noun(name_lower)
if not singular:
singular = name_lower
all_item_docs = list(inventory_ref.stream())
all_item_names = [doc.id for doc in all_item_docs]
# Check exact match
if all_item_names:
if name_lower in all_item_names:
for doc in all_item_docs:
if doc.id == name_lower: return {'doc': doc, 'name': name_lower}
# Check fuzzy match
if all_item_names:
best_match = fuzzy_process.extractOne(name_lower, all_item_names)
if best_match and best_match[1] >= 90:
matched_name = best_match[0]
for doc in all_item_docs:
if doc.id == matched_name:
return {'doc': doc, 'name': matched_name}
return {'doc': None, 'name': singular}
def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
batch = db.batch()
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services")
feedback_messages = []
success_count = 0
for transaction in transaction_data:
details = transaction.get('details', {})
item_name = details.get('item') or details.get('service_name')
if not item_name:
feedback_messages.append("Skipped: An inventory/service update was missing an item or service name.")
continue
canonical_info = _get_canonical_info(user_phone, item_name)
canonical_name = canonical_info['name']
if 'item' in details: details['item'] = canonical_name
if 'service_name' in details: details['service_name'] = canonical_name
try:
change_key = 'quantity' if 'quantity' in details else 'units_available'
change_amount = int(details.get(change_key, 0))
except (ValueError, TypeError):
feedback_messages.append(f"Skipped '{canonical_name}': Invalid quantity or units format.")
continue
doc_ref = inventory_ref.document(canonical_name)
doc_data = {
'details': {**details, change_key: firestore.Increment(change_amount)},
'type': 'service' if 'service_name' in details else 'good',
'last_updated': datetime.now(timezone.utc).isoformat(),
}
batch.set(doc_ref, doc_data, merge=True)
feedback_messages.append(f"Processed '{canonical_name}': change of {change_amount}.")
success_count += 1
if success_count == 0:
return False, "\n".join(feedback_messages) if feedback_messages else "No valid inventory/service items to process."
try:
batch.commit()
return True, "\n".join(feedback_messages)
except Exception as e:
logger.error(f"Inventory/Service batch commit failed for user {user_phone}: {e}", exc_info=True)
return False, f"An error occurred during inventory update: {e}"
def create_sale(user_phone: str, transaction_data: List[Dict], force_settled: bool = False) -> tuple[bool, str]:
feedback_messages = []
any_success = False
# Pre-fetch User Settings for currency fallback
user_settings_ref = db.collection("users").document(user_phone).get()
user_default_currency = "R"
if user_settings_ref.exists:
user_default_currency = user_settings_ref.to_dict().get("currency", "R")
# FIX ISSUE 1: Define sales_ref at the top of the function
# so the process_one_sale transaction function can safely reference it from the enclosing scope.
sales_ref = db.collection("users").document(user_phone).collection("sales")
for t in transaction_data:
details = t.get('details', {})
item_name = details.get('item') or details.get('service_name')
if not item_name:
feedback_messages.append("Sale failed: Missing item or service name.")
continue
try:
canonical_info = _get_canonical_info(user_phone, item_name)
canonical_name = canonical_info['name']
# --- UPGRADE 5: CURRENCY INHERITANCE ---
input_currency = details.get('currency')
inventory_currency = None
last_selling_price = None
# Fetch Inventory Doc
item_doc_ref = db.collection("users").document(user_phone).collection("inventory_and_services").document(canonical_name)
item_snapshot = item_doc_ref.get()
if item_snapshot.exists:
inv_data = item_snapshot.to_dict()
inv_details = inv_data.get('details', {})
inventory_currency = inv_details.get('currency')
# If price is missing in input, use inventory price
if 'price' not in details:
details['price'] = inv_details.get('price', 0)
# Look up last sales price if needed
if 'price' not in details:
# Reference already safely defined at function top
all_sales_query = sales_ref.where('details.item', '==', canonical_name)
all_sales_docs = list(all_sales_query.stream())
if all_sales_docs:
all_sales_docs.sort(key=lambda doc: doc.to_dict().get('timestamp', ''), reverse=True)
last_selling_price = all_sales_docs[0].to_dict().get('details', {}).get('price')
# Priority: Input > Inventory > User Default
final_currency = input_currency or inventory_currency or user_default_currency
@firestore.transactional
def process_one_sale(transaction, sale_details):
is_new_item = canonical_info['doc'] is None
original_trans_type = t.get('transaction_type')
item_type = 'service' if original_trans_type == 'service_offering' else 'good'
user_price = sale_details.get('price') or sale_details.get('unit_price')
if user_price is not None:
selling_price = float(user_price)
elif last_selling_price is not None:
selling_price = float(last_selling_price)
else:
if item_type == 'good' and 'price' not in sale_details:
# Use inventory price if available
selling_price = float(inv_details.get('price', 0)) if item_snapshot.exists else 0
if selling_price == 0:
return f"Sale failed for new item '{canonical_name}': Price not found."
else:
selling_price = 0
sale_details['price'] = selling_price
sale_details['item'] = canonical_name
sale_details['currency'] = final_currency # Save explicit currency!
if 'unit_price' in sale_details: del sale_details['unit_price']
if 'service_name' in sale_details: del sale_details['service_name']
try:
quantity_sold = int(sale_details.get('quantity', 1))
if quantity_sold <= 0: return f"Sale failed for '{canonical_name}': Invalid quantity ({quantity_sold})."
except (ValueError, TypeError):
return f"Sale failed for '{canonical_name}': Invalid quantity format."
# --- UPGRADE 5: Variance (Change/Debt) Logic ---
total_due = selling_price * quantity_sold
amount_paid = sale_details.get('amount_paid')
change_due = 0
amount_outstanding = 0
payment_status = 'completed'
# If force_settled is True (from "Settled" button), ignore variance
if force_settled:
amount_paid = total_due
sale_details['amount_paid'] = total_due
if amount_paid is not None:
try:
amount_paid = float(amount_paid)
if amount_paid > total_due:
change_due = amount_paid - total_due
sale_details['change_given'] = change_due
payment_status = 'overpayment' # Or 'completed' with change note
elif amount_paid < total_due:
amount_outstanding = total_due - amount_paid
sale_details['amount_outstanding'] = amount_outstanding
payment_status = 'partial_payment'
else:
payment_status = 'completed'
except ValueError:
pass
else:
# Default assumption: Exact cash sale if no payment details
amount_paid = total_due
sale_details['amount_paid'] = total_due
sale_details['status'] = payment_status
# -------------------------------------
item_cost = 0
if item_snapshot.exists:
if inv_data.get('type') == 'good':
stock_key = 'quantity'
current_stock = int(inv_details.get(stock_key, 0))
if current_stock < quantity_sold:
return f"Sale failed for '{canonical_name}': Insufficient stock (Have: {current_stock}, Need: {quantity_sold})."
transaction.update(item_doc_ref, {f'details.{stock_key}': firestore.Increment(-quantity_sold)})
item_cost = inv_details.get('price') or inv_details.get('unit_price') or 0
elif item_type == 'good':
return f"Sale failed for '{canonical_name}': Item not found in inventory. Please add it first."
elif is_new_item and item_type == 'service':
logger.info(f"Creating new service '{canonical_name}' during sale.")
service_record = {
'details': {'item': canonical_name, 'price': selling_price, 'currency': final_currency},
'type': 'service',
'last_updated': datetime.now(timezone.utc).isoformat()
}
transaction.set(item_doc_ref, service_record)
sale_doc_ref = sales_ref.document()
sale_record = {
'details': {**sale_details, 'cost': item_cost},
'timestamp': datetime.now(timezone.utc).isoformat(),
'status': payment_status,
'transaction_id': sale_doc_ref.id,
'sale_total': total_due, # Helper for reporting
'customer': sale_details.get('customer', 'Unknown')
}
transaction.set(sale_doc_ref, sale_record)
msg = f"Sale successful for {quantity_sold} x '{canonical_name}' at {final_currency}{selling_price} each."
if change_due > 0:
msg += f" Change due: {final_currency}{change_due:.2f}."
if amount_outstanding > 0:
msg += f" Outstanding debt: {final_currency}{amount_outstanding:.2f}."
# --- Transaction ID Append ---
msg += f"\n\nTransaction ID:\n```{sale_doc_ref.id}```"
return msg
transaction_feedback = process_one_sale(db.transaction(), details)
feedback_messages.append(transaction_feedback)
if "successful" in transaction_feedback:
any_success = True
except Exception as e:
logger.error(f"Transactional sale failed for '{item_name}': {e}", exc_info=True)
feedback_messages.append(f"Sale failed for '{item_name}': An unexpected error occurred.")
return any_success, "\n".join(feedback_messages)
def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
batch = db.batch()
expenses_ref = db.collection("users").document(user_phone).collection("expenses")
success_count = 0
feedback_messages = []
for transaction in transaction_data:
details = transaction.get('details', {})
expense_desc = details.get('description', details.get('category', 'Unnamed Expense'))
if 'amount' not in details:
feedback_messages.append(f"Skipped expense '{expense_desc}': Missing amount.")
continue
doc_ref = expenses_ref.document()
expense_record = {
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(),
'transaction_type': 'expense', 'status': 'recorded', 'transaction_id': doc_ref.id
}
batch.set(doc_ref, expense_record)
feedback_messages.append(f"Recorded expense: '{expense_desc}' ({details.get('currency','')}{details.get('amount')}).\n\nTransaction ID:\n```{doc_ref.id}```")
success_count += 1
if success_count == 0:
return False, "\n".join(feedback_messages) if feedback_messages else "No valid expense transactions to create."
try:
batch.commit()
return True, "\n".join(feedback_messages)
except Exception as e:
logger.error(f"Expense batch commit failed for user {user_phone}: {e}", exc_info=True)
return False, f"Failed to record expenses. An error occurred: {e}"
def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
batch = db.batch()
assets_ref = db.collection("users").document(user_phone).collection("assets")
success_count = 0
feedback_messages = []
for transaction in transaction_data:
details = transaction.get('details', {})
asset_name = details.get('name', 'Unnamed Asset')
if 'value' not in details:
feedback_messages.append(f"Skipped asset '{asset_name}': Missing value.")
continue
doc_ref = assets_ref.document()
asset_record = {
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(),
'transaction_type': 'asset', 'status': 'recorded', 'transaction_id': doc_ref.id
}
batch.set(doc_ref, asset_record)
feedback_messages.append(f"Recorded asset: '{asset_name}' ({details.get('currency','')}{details.get('value')}).\n\nTransaction ID:\n```{doc_ref.id}```")
success_count += 1
if success_count == 0:
return False, "\n".join(feedback_messages) if feedback_messages else "No valid asset transactions to create."
try:
batch.commit()
return True, "\n".join(feedback_messages)
except Exception as e:
logger.error(f"Asset batch commit failed for user {user_phone}: {e}", exc_info=True)
return False, f"Failed to record assets. An error occurred: {e}"
def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
batch = db.batch()
liabilities_ref = db.collection("users").document(user_phone).collection("liabilities")
success_count = 0
feedback_messages = []
for transaction in transaction_data:
details = transaction.get('details', {})
creditor = details.get('creditor', 'Unnamed Creditor')
if 'amount' not in details or not details.get('creditor'):
feedback_messages.append(f"Skipped liability '{creditor}': Missing amount or creditor.")
continue
doc_ref = liabilities_ref.document()
liability_record = {
'details': details,
'timestamp': datetime.now(timezone.utc).isoformat(),
'transaction_type': 'liability',
'status': 'recorded',
'transaction_id': doc_ref.id
}
batch.set(doc_ref, liability_record)
feedback_messages.append(f"Recorded liability to '{creditor}' ({details.get('currency','')}{details.get('amount')}).\n\nTransaction ID:\n```{doc_ref.id}```")
success_count += 1
if success_count == 0:
return False, "\n".join(feedback_messages) if feedback_messages else "No valid liability transactions to create."
try:
batch.commit()
return True, "\n".join(feedback_messages)
except Exception as e:
logger.error(f"Liability batch commit failed for user {user_phone}: {e}", exc_info=True)
return False, f"Failed to record liabilities. An error occurred: {e}"
def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
if df.empty: return df
for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available', 'sale_total', 'amount_outstanding', 'change_given', 'amount_paid']
for col in numeric_cols:
if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
for col in df.select_dtypes(include=['object']).columns:
df[col] = df[col].fillna('Unknown')
return df
def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]:
all_dfs_with_names = []
inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream()
inventory_data, services_data = [], []
for doc in inv_serv_docs:
doc_data = doc.to_dict()
flat_data = {**doc_data, **doc_data.get('details', {})}
if 'details' in flat_data: del flat_data['details']
if doc_data.get('type') == 'service': services_data.append(flat_data)
else: inventory_data.append(flat_data)
if inventory_data: all_dfs_with_names.append(("inventory", _validate_dataframe(pd.DataFrame(inventory_data))))
if services_data: all_dfs_with_names.append(("services", _validate_dataframe(pd.DataFrame(services_data))))
collections_to_fetch = {'sales': 'sales', 'expenses': 'expenses', 'assets': 'assets', 'liabilities': 'liabilities'}
for df_name, coll_name in collections_to_fetch.items():
docs = db.collection("users").document(user_phone).collection(coll_name).stream()
data = [doc.to_dict() for doc in docs]
if data:
flat_data_list = []
for item in data:
flat_item = {**item, **item.get('details', {})}
if 'details' in flat_item: del flat_item['details']
flat_data_list.append(flat_item)
df = pd.DataFrame(flat_data_list)
validated_df = _validate_dataframe(df)
if df_name == 'sales':
if 'price' in validated_df.columns and 'quantity' in validated_df.columns and 'sale_total' not in validated_df.columns:
validated_df['sale_total'] = validated_df['price'] * validated_df['quantity']
if 'cost' in validated_df.columns and 'quantity' in validated_df.columns:
validated_df['cogs'] = validated_df['cost'] * validated_df['quantity']
if 'timestamp' in validated_df.columns:
validated_df['day_of_week'] = validated_df['timestamp'].dt.day_name()
all_dfs_with_names.append((df_name, validated_df))
return all_dfs_with_names
def _get_relative_date_context() -> str:
today = datetime.now(timezone.utc)
def fmt(d): return d.strftime('%Y-%m-%d')
yesterday = today - timedelta(days=1)
start_of_this_week = today - timedelta(days=today.weekday())
start_of_last_week = start_of_this_week - timedelta(days=7)
last_monday = start_of_this_week - timedelta(days=7)
context = [
f"Here are some pre-calculated dates to help you understand the user's request:",
f"- Today is: {fmt(today)}",
f"- Yesterday was: {fmt(yesterday)}",
f"- The start of this week was: {fmt(start_of_this_week)}",
f"- The start of last week was: {fmt(start_of_last_week)}",
f"- Last Monday was on: {fmt(last_monday)}",
]
return "\n".join(context)
def read_datalake(user_phone: str, query: str) -> str:
"""Master function for reading and analyzing business data."""
def _to_text(resp) -> str:
try:
if resp is None: return ""
if hasattr(resp, "content") and resp.content is not None: return str(resp.content)
if hasattr(resp, "text") and resp.text is not None: return str(resp.text)
if isinstance(resp, (list, tuple)): return "".join(_to_text(r) for r in resp)
return str(resp)
except Exception: return str(resp)
try:
all_dfs_with_names = _fetch_all_collections_as_dfs(user_phone)
if not all_dfs_with_names:
if any(keyword in query.lower() for keyword in ['help', 'tutorial', 'guide', 'what is', 'explain']):
return "I'm Qx, your business assistant! I can help you track sales, expenses, and inventory. Once you add some data, I can give you reports and insights. How can I help you get started?"
return "You have no data recorded yet. Please add some transactions first."
query_lower = query.lower()
engine = ReportEngine(all_dfs_with_names, query)
# --- Tier 0: Simple Direct Lookups ---
simple_lookup_map = {
"inventory": ["stock", "inventory", "in stock", "what do i have"],
"assets": ["asset", "assets", "my assets"],
"liabilities": ["liabilities", "i owe", "creditor", "my debts"],
}
for df_name, keywords in simple_lookup_map.items():
if any(keyword in query_lower for keyword in keywords):
logger.info(f"Handling '{query}' with Simple Lookup Path for '{df_name}'.")
target_df_tuple = next((item for item in all_dfs_with_names if item[0] == df_name), None)
if target_df_tuple is not None and not target_df_tuple[1].empty:
return render_df_as_image(target_df_tuple[1])
return f"You don't have any {df_name} recorded yet."
# --- Tier 1: Canned & Temporal Reports ---
report_json = None
sales_report_triggers = ["sales report", "sales summary", "sales performance", "how were my sales", "revenue report"]
expense_report_triggers = ["expense report", "expense summary", "expense performance", "how were my expenses", "spending report"]
debt_report_triggers = ["who owes", "debt", "debtors", "outstanding", "credit sales"]
if any(trigger in query_lower for trigger in sales_report_triggers):
logger.info(f"Handling '{query}' with the General Sales Report Path.")
report_json = engine.generate_report()
elif any(trigger in query_lower for trigger in expense_report_triggers):
logger.info(f"Handling '{query}' with the General Expense Report Path.")
report_json = engine.generate_report()
elif any(trigger in query_lower for trigger in debt_report_triggers):
logger.info(f"Handling '{query}' with the Debt Report Path.")
report_json = engine.generate_debt_report()
elif "profit" in query_lower:
logger.info(f"Handling '{query}' with the Profit Report Path.")
report_json = engine.generate_profit_report()
elif any(k in query_lower for k in ["best day", "busiest day", "sales by day"]):
logger.info(f"Handling '{query}' with the Day of Week Report Path.")
report_json = engine.generate_day_of_week_report()
else:
item_report_match = re.search(r"(?:report on|performance of)\s+([\w\s]+?)(?:\s+(?:this|last|on|in|for|today|yesterday)|$)", query_lower)
if item_report_match:
item_name = item_report_match.group(1).strip()
if item_name not in ["sales", "expenses", "profit"]:
logger.info(f"Handling '{query}' with the Item Report Path for item: '{item_name}'.")
report_json = engine.generate_item_report(item_name)
if report_json:
report_data = json.loads(report_json)
if "error" in report_data: return report_data["error"]
synthesis_prompt = f"""
Directly synthesize a professional business report from the following JSON data. Omit conversational introductions or summaries. Present only the data-driven report, formatted for WhatsApp (*bold*, _italic_, emojis).
For sales reports, if helpful, provide a creative and actionable "Insight" section at the end based on the best/worst selling items.
**IMPORTANT INSTRUCTIONS:**
- If `report_subject` is "Profitability", present a clear financial summary.
- If `report_subject` is "Item Report", state the item name and present its performance KPIs.
- If `report_subject` is "Debtors List", list the customers and amounts clearly.
Here is the data summary:
{report_json}
"""
response = llm.invoke(synthesis_prompt)
return _to_text(response)
# --- Tier 2: Predictive Queries ---
predictive_keywords = ["expect", "forecast", "predict"]
if any(keyword in query_lower for keyword in predictive_keywords):
logger.info(f"Handling '{query}' with the Forecasting Path.")
forecast_json = engine.generate_forecast_data()
forecast_data = json.loads(forecast_json)
if "error" in forecast_data: return forecast_data["error"]
synthesis_prompt = f"Synthesize a sales forecast from the following JSON data. Omit conversational introductions or summaries. Present only the forecast. Data: {forecast_json}"
response = llm.invoke(synthesis_prompt)
return _to_text(response)
# --- Tier 3: Business Coach & Help Layer ---
help_keywords = ['help', 'tutorial', 'guide', 'how do you work', 'what can you do', 'how can', 'how would']
literacy_keywords = [ 'explain', 'how to', 'how do i', 'ideas for', 'advice on', "how do", "why", "can I", "can we"]
if any(keyword in query_lower for keyword in help_keywords) or any(keyword in query_lower for keyword in literacy_keywords):
logger.info(f"Handling '{query}' with the Business Coach Path.")
snapshot = engine.generate_business_snapshot()
snapshot_str = json.dumps(snapshot, indent=2)
synthesis_prompt = f"""
You are Qx, a friendly and insightful business coach and financial expert. Your task is to provide a clear, helpful, and strategic answer based on the user's question, using your general business knowledge combined with the business snapshot provided below for context.
**Business Snapshot for Context:**
{snapshot_str}
**User's Question:**
"{query}"
"""
response = llm.invoke(synthesis_prompt)
return _to_text(response)
# --- Tier 4: Fortified PandasAI with Graceful Fallback (Final Path) ---
else:
try:
logger.info(f"Handling '{query}' with the Fortified PandasAI Path.")
schema_description = "You have been provided with these Pandas DataFrames:\n"
for name, df in all_dfs_with_names:
schema_description += f"* **{name}**: Contains columns like {', '.join(df.columns.to_list())}.\n"
date_context = _get_relative_date_context()
today_str = datetime.now(timezone.utc).strftime('%Y-%m-%d')
pandasai_prompt = (
f"{schema_description}\n"
f"For context, today's date is {today_str}.\n{date_context}\n\n"
f"**IMPORTANT RULES:**\n"
f"1. For time-based queries, you MUST use timezone-aware pandas Timestamps for comparison. Example: `pd.Timestamp('{today_str}', tz='UTC')`.\n"
f"2. Your code MUST end by declaring a `result` dictionary.\n"
f"3. Your primary goal is to answer with data. Return a DataFrame or a string. ONLY generate a plot if the user explicitly asks for a 'chart', 'plot', 'graph', 'diagram', or 'show'.\n"
f"4. When a plot is requested, you MUST save it as a file. The final line of your code must be `result = {{'type': 'plot', 'value': 'filename.png'}}`.\n\n"
f"Based on this, please write Python code to answer the following specific user query: '{query}'"
)
datalake_dfs = [df for _, df in all_dfs_with_names]
lake = SmartDatalake(datalake_dfs, config={"llm": llm, "response_parser": FlaskResponse, "save_charts_path": user_defined_path, "enable_cache": False, "conversational": False})
response = lake.chat(pandasai_prompt)
# FIX: Catch both string-based "No code" failures and None/empty responses
if not response or "No code found" in str(response):
raise NoCodeFoundError("PandasAI failed to generate a valid data answer.")
if isinstance(response, dict) and 'value' in response:
return str(response['value'])
elif isinstance(response, str):
return response
else:
return str(response)
except Exception as e:
# Catch NoCodeFoundError specifically and all other PandasAI failures to reroute to Coach
logger.warning(f"PandasAI path failed for query '{query}': {e}. Falling back to Business Coach Synthesis.")
snapshot = engine.generate_business_snapshot()
snapshot_str = json.dumps(snapshot, indent=2)
synthesis_prompt = f"""
You are Qx, a friendly and insightful business coach. I was unable to perform a direct data calculation for this query, so please provide a strategic, high-level business answer based on the snapshot and the question.
**Business Snapshot:** {snapshot_str}
**User Question:** "{query}"
"""
response = llm.invoke(synthesis_prompt)
return _to_text(response)
except Exception as e:
logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
return "Sorry, I encountered an error while analyzing your data."
def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
col_ref = db.collection("users").document(user_phone).collection(collection_name)
# --- UPGRADE 1 & 2: Search by Transaction ID ---
if 'transaction_id' in details and details['transaction_id']:
doc = col_ref.document(details['transaction_id']).get()
if doc.exists: return {"id": doc.id, "data": doc.to_dict()}
if collection_name in ['inventory_and_services', 'sales'] and ('item' in details or 'service_name' in details):
item_name = details.get('item') or details.get('service_name')
canonical_info = _get_canonical_info(user_phone, item_name)
if canonical_info['doc']:
doc = canonical_info['doc']
return {"id": doc.id, "data": doc.to_dict()}
query = col_ref
key_map = {'expenses': 'description', 'assets': 'name', 'liabilities': 'creditor'}
search_key = key_map.get(collection_name)
filters_applied = False
if search_key and search_key in details:
query = query.where(f'details.{search_key}', '==', details[search_key]); filters_applied = True
if 'amount' in details:
query = query.where('details.amount', '==', details['amount']); filters_applied = True
if 'value' in details:
query = query.where('details.value', '==', details['value']); filters_applied = True
if not filters_applied: return None
docs = query.limit(2).stream()
found_docs = [{"id": doc.id, "data": doc.to_dict()} for doc in docs]
if len(found_docs) == 1: return found_docs[0]
elif len(found_docs) > 1: return "multiple_matches"
else: return None
def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
feedback = []
any_success = False
for trans in transaction_data:
details = trans.get('details', {})
trans_type = trans.get('transaction_type', '').lower()
collection_map = {
'purchase': 'inventory_and_services', 'sale': 'sales', 'inventory': 'inventory_and_services',
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities'
}
collection_name = collection_map.get(trans_type)
if not collection_name:
feedback.append(f"Update skipped: Unknown type '{trans_type}'.")
continue
target_doc = _find_document_by_details(user_phone, collection_name, details)
if target_doc == "multiple_matches":
feedback.append(f"Update for {trans_type} failed: Multiple records match. Please use the ID or be more specific.")
continue
if not target_doc:
feedback.append(f"Update for {trans_type} failed: No record found matching your description or ID.")
continue
doc_id = target_doc["id"]
doc_ref = db.collection("users").document(user_phone).collection(collection_name).document(doc_id)
try:
updates = {f"details.{k}": v for k, v in details.items() if k != 'transaction_id'}
if not updates:
feedback.append(f"Update for {trans_type} (ID: {doc_id}) skipped: No new data provided.")
continue
updates['last_updated'] = datetime.now(timezone.utc).isoformat()
doc_ref.update(updates)
feedback.append(f"Successfully updated {trans_type} record (ID: {doc_id}).")
any_success = True
except Exception as e:
logger.error(f"Update failed for doc '{doc_id}': {e}", exc_info=True)
feedback.append(f"Update for {trans_type} (ID: {doc_id}) failed with an error.")
return any_success, "\n".join(feedback)
def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]:
feedback = []
any_success = False
for trans in transaction_data:
details = trans.get('details', {})
trans_type = trans.get('transaction_type', '').lower()
collection_map = {
'purchase': 'inventory_and_services', 'sale': 'sales', 'inventory': 'inventory_and_services',
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities'
}
collection_name = collection_map.get(trans_type)
if not collection_name:
feedback.append(f"Delete skipped: Unknown type '{trans_type}'.")
continue
target_doc = _find_document_by_details(user_phone, collection_name, details)
if target_doc == "multiple_matches":
feedback.append(f"Delete for {trans_type} failed: Multiple records match. Please use the ID or be more specific.")
continue
if not target_doc:
feedback.append(f"Delete for {trans_type} failed: No record found matching your description or ID.")
continue
doc_id = target_doc["id"]
try:
db.collection("users").document(user_phone).collection(collection_name).document(doc_id).delete()
feedback.append(f"Successfully deleted {trans_type} record (ID: {doc_id}).")
any_success = True
except Exception as e:
logger.error(f"Delete failed for doc '{doc_id}': {e}", exc_info=True)
feedback.append(f"Delete for {trans_type} (ID: {doc_id}) failed with an error.")
return any_success, "\n".join(feedback)
def reset_user_account(user_phone: str) -> str:
"""Permanently deletes all transaction data for a user. (Upgrade 3)"""
collections = ['sales', 'expenses', 'assets', 'liabilities', 'inventory_and_services', 'temp_transactions']
user_ref = db.collection("users").document(user_phone)
try:
total_deleted = 0
batch = db.batch()
count = 0
for coll_name in collections:
docs = user_ref.collection(coll_name).list_documents()
for doc in docs:
batch.delete(doc)
count += 1
if count >= 450: # Firestore batch limit
batch.commit()
batch = db.batch()
count = 0
if count > 0:
batch.commit()
return "Your account has been reset. All transaction data has been permanently deleted."
except Exception as e:
logger.error(f"Account reset failed for {user_phone}: {e}", exc_info=True)
return "An error occurred while resetting your account. Please try again later."
def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool:
if not transactions: return False
try:
doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document("pending")
doc_ref.set({"transactions": transactions, "timestamp": datetime.now(timezone.utc).isoformat()})
return True
except Exception as e:
logger.error(f"Failed to persist temporary transaction for user {mobile}: {e}", exc_info=True)
return False
def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str:
if not transactions: return "No transaction data to display."
if isinstance(transactions, dict): transactions = [transactions]
output_lines = []
for idx, trans in enumerate(transactions):
if not isinstance(trans, dict): continue
details = trans.get('details', trans)
trans_type = trans.get('transaction_type', 'Item').replace("_", " ").title()
title = f"{trans_type}"
if len(transactions) > 1: output_lines.append(f"--- {title} {idx + 1} ---")
else: output_lines.append(f"--- {title} ---")
# --- UPGRADE 1 & 2: Make Transaction ID easy to copy ---
if 'transaction_id' in trans:
output_lines.append(f"• Transaction ID: ```{trans['transaction_id']}```")
key_order = ['item', 'service_name', 'name', 'creditor', 'category', 'quantity', 'units_available', 'hours', 'price', 'rate', 'amount', 'cost', 'value', 'customer', 'amount_paid', 'change_given', 'amount_outstanding', 'vendor', 'client', 'date', 'acquisition_date', 'due_date', 'description', 'type']
displayed_keys = set()
displayed_keys.add('transaction_id')
for key in key_order:
if key in details and key not in displayed_keys:
val = details[key]
# Format money fields
if isinstance(val, (int, float)) and key in ['price', 'amount', 'cost', 'value', 'amount_paid', 'change_given', 'amount_outstanding']:
val = f"{details.get('currency', '')}{val:.2f}"
output_lines.append(f"• {key.replace('_', ' ').title()}: {val}")
displayed_keys.add(key)
for key, value in details.items():
if key not in displayed_keys and key != 'currency':
output_lines.append(f"• {key.replace('_', ' ').title()}: {value}")
output_lines.append("")
return "\n".join(output_lines).strip()
def fetch_transaction(user_phone: str, identifier: str, collection: str = "inventory_and_services"):
try:
doc = db.collection("users").document(user_phone).collection(collection).document(identifier).get()
if doc.exists: return doc.to_dict()
return None
except Exception as e:
logger.error(f"Error fetching transaction '{identifier}' from '{collection}': {e}", exc_info=True)
return None
def process_intent(parsed_trans_data: List[Dict], mobile: str, force_settled: bool = False) -> str:
"""Groups transactions, processes them, and returns feedback. Supports forced settlement."""
if not parsed_trans_data:
return "I couldn't understand the transaction details. Could you please try again?"
grouped_transactions = {}
for trans in parsed_trans_data:
intent = trans.get('intent', 'unknown').lower()
trans_type = trans.get('transaction_type', 'unknown').lower()
key = (intent, trans_type)
if key not in grouped_transactions: grouped_transactions[key] = []
grouped_transactions[key].append(trans)
final_feedback = []
for (intent, trans_type), transactions in grouped_transactions.items():
logger.info(f"Processing group: {intent} - {trans_type} for user {mobile}")
success, message = False, ""
try:
if intent == 'create':
if trans_type == 'sale':
success, message = create_sale(mobile, transactions, force_settled=force_settled)
final_feedback.append(f"Sales Processing:\n{message}")
elif trans_type in ('purchase', 'inventory', 'service_offering'):
success, message = create_or_update_inventory_or_service_offering(mobile, transactions)
final_feedback.append(f"Inventory/Service Updates:\n{message}")
elif trans_type == 'expense':
success, message = create_expense(mobile, transactions)
final_feedback.append(f"Expense Recording:\n{message}")
elif trans_type == 'asset':
success, message = create_asset(mobile, transactions)
final_feedback.append(f"Asset Recording:\n{message}")
elif trans_type == 'liability':
success, message = create_liability(mobile, transactions)
final_feedback.append(f"Liability Recording:\n{message}")
else:
final_feedback.append(f"Cannot 'create' transactions of type: '{trans_type}'")
elif intent == 'read' or trans_type == 'query':
query_details = transactions[0].get('details', {})
query_str = query_details.get('query')
if not query_str: query_str = "Show summary of all data"
read_result = read_datalake(mobile, query_str)
final_feedback.append(read_result)
elif intent == 'update':
success, message = update_transaction(mobile, transactions)
final_feedback.append(f"Update Results:\n{message}")
elif intent == 'delete':
success, message = delete_transaction(mobile, transactions)
final_feedback.append(f"Deletion Results:\n{message}")
elif intent == 'reset_account':
# --- UPGRADE 3: Reset Account Handler ---
message = reset_user_account(mobile)
final_feedback.append(message)
else:
final_feedback.append(f"Unknown intent '{intent}' for type '{trans_type}'.")
except Exception as e:
logger.error(f"Error processing group ({intent}, {trans_type}) for user {mobile}: {e}", exc_info=True)
final_feedback.append(f"An unexpected error occurred while processing your {trans_type} {intent} request.")
if not final_feedback: return "No actions were processed from your request."
return "\n\n".join(final_feedback).strip()