import json import os import logging from datetime import datetime, timezone, timedelta from typing import List, Dict, Union, Optional, Any, Tuple from google.cloud import firestore import pandas as pd import inflect from thefuzz import process as fuzzy_process from pandasai import SmartDatalake from pandasai.responses.response_parser import ResponseParser from pandasai.exceptions import NoCodeFoundError, MaliciousQueryError from langchain_google_genai import ChatGoogleGenerativeAI import google.generativeai as genai import re import uuid import dataframe_image as dfi from PIL import Image import io logger = logging.getLogger(__name__) import firebase_admin from firebase_admin import credentials, firestore # --- Initialize supporting libraries --- p = inflect.engine() def init_firestore_from_env(env_var: str = "FIREBASE"): """ Initialise firebase-admin with the service-account JSON that is stored in the given environment variable and return a Firestore client. """ try: if firebase_admin._apps: return firestore.client() sa_json = os.environ[env_var] sa_info = json.loads(sa_json) cred = credentials.Certificate(sa_info) # FIX: Include storageBucket in initialization for audio handling bucket_name = os.environ.get("FIREBASE_STORAGE_BUCKET") firebase_admin.initialize_app(cred, { 'storageBucket': bucket_name }) return firestore.client() except KeyError: logging.error("%s environment variable is not set", env_var) raise except (json.JSONDecodeError, ValueError) as e: logging.error("Invalid service-account JSON in %s: %s", env_var, e) raise except Exception as e: logging.exception("Failed to initialise Firestore: %s", e) raise db = init_firestore_from_env() guid = uuid.uuid4() new_filename = f"{guid}" user_defined_path = os.path.join(os.getcwd(), "plots") os.makedirs(user_defined_path, exist_ok=True) def render_df_as_image(df: pd.DataFrame) -> Optional[str]: """Renders a Pandas DataFrame as an image and returns the file path.""" if df.empty: return "The data requested is empty." try: max_cols = 10 if len(df.columns) > max_cols: df = df.iloc[:, :max_cols] img_path = os.path.join(user_defined_path, f"report_{uuid.uuid4()}.png") dfi.export(df, img_path, table_conversion='matplotlib', dpi=200) return img_path except Exception as e: logger.error(f"Failed to render DataFrame as image: {e}") return "There was an error creating the data table image." class FlaskResponse(ResponseParser): def __init__(self, context) -> None: super().__init__(context) def format_dataframe(self, result): df = result['value'] return render_df_as_image(df) def format_plot(self, result): try: img_path = result['value'] except ValueError: img_path = str(result['value']) return img_path def format_other(self, result): return result['value'] # --- AI Model Configuration --- try: genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) GENERATIVE_MODEL_NAME = "gemini-2.0-flash" VISION_MODEL_NAME = "gemini-2.0-flash" # Increased token limit to 8192 to prevent JSON truncation on long lists model = genai.GenerativeModel( GENERATIVE_MODEL_NAME, generation_config={ "temperature": 0.1, "top_p": 0.9, "top_k": 10, "max_output_tokens": 8192, } ) vision_model = genai.GenerativeModel(VISION_MODEL_NAME) llm = ChatGoogleGenerativeAI( model=GENERATIVE_MODEL_NAME, temperature=0.1, convert_system_message_to_human=True ) logger.info(f"Using Generative Models: {GENERATIVE_MODEL_NAME} (Text) and {VISION_MODEL_NAME} (Vision)") except KeyError: logger.error("GOOGLE_API_KEY environment variable not set!") model = vision_model = llm = None except Exception as e: logger.error(f"Error configuring Generative AI: {e}", exc_info=True) model = vision_model = llm = None # --- START: LANGUAGE PROCESSING (UPGRADE 4) --- def detect_and_translate_input(text: str) -> Dict[str, str]: """ Detects language. If not English, translates to English preserving numbers/entities. Returns: {'english_text': str, 'detected_lang': str} """ if not model: return {'english_text': text, 'detected_lang': 'English'} system_prompt = """ You are a language detection and translation engine for a bookkeeping bot. 1. Detect the language of the user's input (e.g., English, Shona, Zulu, Ndebele, Tswana, etc.). 2. CRITICAL: Do NOT be misled by Proper Nouns or Names (e.g., "Chiedza", "Musa", "Farai"). Focus on verbs and sentence structure. If the grammatical structure is English, return "English" as the language regardless of local names used. 3. If the language is NOT English, translate the text accurately to English. 4. IMPORTANT: Preserve all numbers, currency symbols ($, R, ZIG), and item names exactly. 5. If the language IS English, return the text as is. 6. Return ONLY a valid JSON object. Output Schema: { "detected_lang": "Name of Language", "english_text": "The translated text or original text" } """ try: response = model.generate_content([system_prompt, text]) cleaned_response = re.sub(r'^```json\s*|\s*```$', '', response.text, flags=re.MULTILINE).strip() result = json.loads(cleaned_response) return result except Exception as e: logger.error(f"Language detection failed: {e}") return {'english_text': text, 'detected_lang': 'English'} def translate_output(text: str, target_lang: str) -> str: """Translates the system's English response back to the target language.""" if not model or not target_lang or target_lang.lower() == 'english': return text prompt = f""" Translate the following bookkeeping response from English to {target_lang}. Rules: - Keep numbers, transaction IDs, and currency symbols (e.g., $10.00, R50) UNCHANGED. - Keep formatting like markdown (*bold*) UNCHANGED. - Maintain a professional, helpful tone. Text to translate: "{text}" """ try: response = model.generate_content(prompt) return response.text.strip() except Exception as e: logger.error(f"Output translation failed: {e}") return text # --- END: LANGUAGE PROCESSING --- # --- START: VISION PROCESSING FUNCTIONS --- def _transpile_vision_json_to_query(vision_json: List[Dict]) -> str: if not vision_json: return "Error: Could not extract any transactions from the image." query_parts = [] for trans in vision_json: details = trans.get("details", {}) trans_type = trans.get("transaction_type", "unknown") intent = trans.get("intent", "record") part = f"{intent.title()} a {trans_type}" item = details.get("item") or details.get("name") or details.get("description") or details.get("service_name") quantity = details.get("quantity") price = details.get("price") or details.get("amount") or details.get("value") or details.get("unit_price") currency = details.get("currency", "") vendor = details.get("vendor") or details.get("creditor") if quantity and item: part += f" of {quantity} '{item}'" elif item: part += f" for '{item}'" if price: part += f" for {currency}{price}" if vendor: part += f" from {vendor}" query_parts.append(part) final_query = " and ".join(query_parts) return final_query.strip() def _analyze_image_with_vision(image_bytes: bytes, caption: Optional[str]) -> List[Dict]: """Sends the image to the Gemini Vision model and returns a structured JSON list of transactions.""" if not vision_model: return [] try: image_pil = Image.open(io.BytesIO(image_bytes)) prompt = f""" You are a bookkeeping vision model. Analyze the image. Return ONLY a valid JSON array [] of transaction objects. **CRITICAL CLASSIFICATION RULES:** 1. **RECEIPTS / INVOICES / BILLS:** - These are money LEAVING the business. - Classify as **"expense"** (for utilities, rent, etc.) or **"purchase"** (for buying stock/inventory). - Do NOT classify a receipt as a "sale". 2. **PRODUCT PHOTOS / SHELVES:** - If it is a photo of an item with no text/receipt, OR a photo of a customer holding an item. - Classify as **"sale"** (money COMING IN). 3. **CATALOGS / MENUS:** - Classify as **"service_offering"** or **"inventory"** (creating items in system). **USER CONTEXT:** {caption if caption else "None"} ==================== SCHEMA (each object) ==================== {{ "intent": "create" | "read" | "update" | "delete", "transaction_type": "sale" | "purchase" | "expense" | "asset" | "liability" | "inventory" | "service_offering" | "query", "details": {{ /* keys below */ }}, "source": "ocr" | "object_detection" }} ================ DETAILS KEYS ================ - sale (goods): item, quantity, price, currency - expense: description, amount, currency, vendor - purchase: item, quantity, price, currency - asset: name, value, currency - liability: creditor, amount, currency - query: query (verbatim text) Analyze the provided image and return only the JSON list. """ response = vision_model.generate_content([prompt, image_pil]) response_text = response.text json_str = re.search(r'\[.*\]', response_text, re.DOTALL) if json_str: return json.loads(json_str.group(0)) else: logger.error(f"Vision AI did not return a valid JSON list. Raw response: {response_text}") return [] except Exception as e: logger.error(f"Error in Vision AI processing: {e}", exc_info=True) return [] def process_image_and_generate_query(image_bytes: bytes, caption: Optional[str]) -> str: """Master function to process an image and generate a natural language query.""" logger.info("Starting image analysis with Vision AI.") vision_json_list = _analyze_image_with_vision(image_bytes, caption) if not vision_json_list: return "Error: I couldn't find any actionable transactions in the image." logger.info(f"Vision AI analysis complete. Result: {vision_json_list}") return _transpile_vision_json_to_query(vision_json_list) # --- END: VISION PROCESSING FUNCTIONS --- class ReportEngine: def __init__(self, dfs_with_names: List[Tuple[str, pd.DataFrame]], query: str): self.dfs = {name: df for name, df in dfs_with_names} self.query = query.lower() self.now = datetime.now(timezone.utc) self.results = {} self.currency = self._get_user_currency() def _get_user_currency(self) -> str: """Determines the user's primary currency from their data.""" for df_name in ['sales', 'expenses', 'assets', 'liabilities']: if df_name in self.dfs and 'details' in self.dfs[df_name].columns: currency_series = self.dfs[df_name]['details'].str.get('currency') mode = currency_series.dropna().mode() if not mode.empty: primary_currency = mode[0] if isinstance(primary_currency, str) and primary_currency.strip() and primary_currency != 'Unknown': return primary_currency return "R" def _get_time_filter(self, target_df: pd.DataFrame) -> Optional[pd.Series]: if target_df is None or 'timestamp' not in target_df.columns or target_df.empty: return None if "yesterday" in self.query: yesterday = (self.now - timedelta(days=1)).date() start_of_yesterday = pd.Timestamp(yesterday, tz='UTC') end_of_yesterday = start_of_yesterday + timedelta(days=1) return (target_df['timestamp'] >= start_of_yesterday) & (target_df['timestamp'] < end_of_yesterday) if "today" in self.query: today = self.now.date() start_of_today = pd.Timestamp(today, tz='UTC') end_of_today = start_of_today + timedelta(days=1) return (target_df['timestamp'] >= start_of_today) & (target_df['timestamp'] < end_of_today) if "last month" in self.query: first_day_current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) last_day_last_month = first_day_current_month - timedelta(days=1) first_day_last_month = last_day_last_month.replace(day=1) return (target_df['timestamp'] >= pd.Timestamp(first_day_last_month)) & (target_df['timestamp'] < pd.Timestamp(first_day_current_month)) month_match = re.search(r"\b(january|february|march|april|may|june|july|august|september|october|november|december)\b", self.query) if month_match: month_name = month_match.group(1) month_number = datetime.strptime(month_name.title(), "%B").month current_year = self.now.year return (target_df['timestamp'].dt.month == month_number) & (target_df['timestamp'].dt.year == current_year) if "this month" in self.query or "month" in self.query: return (target_df['timestamp'].dt.month == self.now.month) & (target_df['timestamp'].dt.year == self.now.year) if "last week" in self.query: start_of_this_week = self.now.date() - timedelta(days=self.now.weekday()) start_of_last_week = start_of_this_week - timedelta(days=7) return (target_df['timestamp'].dt.date >= start_of_last_week) & (target_df['timestamp'].dt.date < start_of_this_week) if "this week" in self.query or "week" in self.query: start_of_week = self.now.date() - timedelta(days=self.now.weekday()) return target_df['timestamp'].dt.date >= start_of_week day_match = re.search(r"on (monday|tuesday|wednesday|thursday|friday|saturday|sunday)", self.query) if day_match and 'day_of_week' in target_df.columns: day_name = day_match.group(1).title() return target_df['day_of_week'] == day_name if "year" in self.query: return target_df['timestamp'].dt.year == self.now.year return pd.Series(True, index=target_df.index) def generate_report(self) -> str: """Generates a simple Sales or Expenses report.""" subject = "sales" expense_triggers = ["expense report", "expense summary", "expenses"] if any(keyword in self.query for keyword in expense_triggers): subject = "expenses" target_df = self.dfs.get(subject, pd.DataFrame()) if target_df.empty: return json.dumps({"error": f"I couldn't find any {subject} data to generate a report."}) time_filter = self._get_time_filter(target_df) if time_filter is not None: target_df = target_df[time_filter] if target_df.empty: return json.dumps({"error": f"No {subject} data found for the specified period."}) if subject == "sales": total_revenue = target_df['sale_total'].sum() num_transactions = len(target_df) item_summary = target_df.groupby('item')['quantity'].sum() if item_summary.empty: best_selling_item = "N/A" worst_selling_item = "N/A" elif len(item_summary) == 1: best_selling_item = item_summary.idxmax() worst_selling_item = "N/A (Only one item type sold)" else: max_val = item_summary.max() min_val = item_summary.min() if max_val == min_val: best_selling_item = "Tie (All items sold equally)" worst_selling_item = "Tie" else: best_selling_item = item_summary.idxmax() worst_selling_item = item_summary.idxmin() self.results = { "report_subject": "Sales", "total_revenue": f"{self.currency}{total_revenue:.2f}", "number_of_sales": num_transactions, "best_selling_item": best_selling_item, "worst_selling_item": worst_selling_item } else: # expenses total_expenses = target_df['amount'].sum() num_transactions = len(target_df) category_summary = target_df.groupby('description')['amount'].sum() highest_expense_category = category_summary.idxmax() if not category_summary.empty else "N/A" self.results = { "report_subject": "Expenses", "total_expenses": f"{self.currency}{total_expenses:.2f}", "number_of_expenses": num_transactions, "highest_expense_category": highest_expense_category } return json.dumps(self.results, indent=2) def generate_debt_report(self) -> str: """Generates a report of outstanding debts (Upgrade 5).""" sales_df = self.dfs.get('sales', pd.DataFrame()) if sales_df.empty: return json.dumps({"error": "No sales data found."}) # Check for outstanding amounts or partial/pending status if 'amount_outstanding' not in sales_df.columns: return json.dumps({"message": "No debt records found."}) debt_df = sales_df[sales_df['amount_outstanding'] > 0] if debt_df.empty: return json.dumps({"message": "Great news! No pending debts found."}) total_outstanding = debt_df['amount_outstanding'].sum() # Group by customer debtors = [] if 'customer' in debt_df.columns: summary = debt_df.groupby('customer')['amount_outstanding'].sum().reset_index() for _, row in summary.iterrows(): debtors.append(f"{row['customer']}: {self.currency}{row['amount_outstanding']:.2f}") else: debtors.append("Various customers (names not recorded)") self.results = { "report_subject": "Debtors List", "total_outstanding": f"{self.currency}{total_outstanding:.2f}", "debtors": debtors } return json.dumps(self.results, indent=2) def generate_profit_report(self) -> str: """Generates a comprehensive profitability report.""" sales_df = self.dfs.get('sales', pd.DataFrame()) expenses_df = self.dfs.get('expenses', pd.DataFrame()) time_filter_sales = self._get_time_filter(sales_df) time_filter_expenses = self._get_time_filter(expenses_df) filtered_sales = sales_df[time_filter_sales] if time_filter_sales is not None else sales_df filtered_expenses = expenses_df[time_filter_expenses] if time_filter_expenses is not None else expenses_df total_revenue = filtered_sales['sale_total'].sum() if not filtered_sales.empty else 0 total_cogs = filtered_sales['cogs'].sum() if not filtered_sales.empty and 'cogs' in filtered_sales.columns else 0 total_expenses = filtered_expenses['amount'].sum() if not filtered_expenses.empty else 0 gross_profit = total_revenue - total_cogs net_profit = gross_profit - total_expenses num_sales = len(filtered_sales) if not filtered_sales.empty else 0 total_items_sold = filtered_sales['quantity'].sum() if not filtered_sales.empty else 0 atv = total_revenue / num_sales if num_sales > 0 else 0 ipt = total_items_sold / num_sales if num_sales > 0 else 0 expense_ratio = (total_expenses / total_revenue) * 100 if total_revenue > 0 else 0 most_profitable_item = "N/A" if not filtered_sales.empty and 'cogs' in filtered_sales.columns: filtered_sales_copy = filtered_sales.copy() filtered_sales_copy['item_profit'] = filtered_sales_copy['sale_total'] - filtered_sales_copy['cogs'] item_profitability = filtered_sales_copy.groupby('item')['item_profit'].sum() if not item_profitability.empty: most_profitable_item = item_profitability.idxmax() self.results = { "report_subject": "Profitability", "total_revenue": f"{self.currency}{total_revenue:.2f}", "total_cogs": f"{self.currency}{total_cogs:.2f}", "gross_profit": f"{self.currency}{gross_profit:.2f}", "total_expenses": f"{self.currency}{total_expenses:.2f}", "net_profit": f"{self.currency}{net_profit:.2f}", "average_transaction_value": f"{self.currency}{atv:.2f}", "items_per_transaction": f"{ipt:.2f}", "expense_to_revenue_ratio": f"{expense_ratio:.2f}%", "most_profitable_item": most_profitable_item } return json.dumps(self.results, indent=2) def generate_item_report(self, subject_item: str) -> str: """Generates a performance report for a specific item.""" sales_df = self.dfs.get('sales', pd.DataFrame()) if sales_df.empty: return json.dumps({"error": f"No sales data found for '{subject_item}'."}) item_df = sales_df[sales_df['item'].str.contains(subject_item, case=False, na=False)] if item_df.empty: return json.dumps({"error": f"I couldn't find any sales for '{subject_item}'."}) time_filter = self._get_time_filter(item_df) filtered_df = item_df[time_filter] if time_filter is not None else item_df if filtered_df.empty: return json.dumps({"error": f"No data for '{subject_item}' in this period."}) units_sold = filtered_df['quantity'].sum() total_revenue = filtered_df['sale_total'].sum() total_cogs = filtered_df['cogs'].sum() if 'cogs' in filtered_df.columns else 0 gross_profit = total_revenue - total_cogs profit_margin = (gross_profit / total_revenue) * 100 if total_revenue > 0 else 0 avg_price = total_revenue / units_sold if units_sold > 0 else 0 self.results = { "report_subject": "Item Report", "item_name": subject_item, "units_sold": int(units_sold), "total_revenue": f"{self.currency}{total_revenue:.2f}", "total_cogs": f"{self.currency}{total_cogs:.2f}", "gross_profit": f"{self.currency}{gross_profit:.2f}", "profit_margin": f"{profit_margin:.2f}%", "average_selling_price": f"{self.currency}{avg_price:.2f}" } return json.dumps(self.results, indent=2) def generate_day_of_week_report(self) -> str: sales_df = self.dfs.get('sales', pd.DataFrame()) if sales_df.empty or 'day_of_week' not in sales_df.columns: return json.dumps({"error": "No data available to analyze by day."}) time_filter = self._get_time_filter(sales_df) filtered_df = sales_df[time_filter] if time_filter is not None else sales_df if filtered_df.empty: return json.dumps({"error": "No sales data in this period."}) daily_sales = filtered_df.groupby('day_of_week')['sale_total'].sum() if daily_sales.empty: return json.dumps({"error": "No sales to analyze by day in this period."}) best_day = daily_sales.idxmax() day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] daily_sales = daily_sales.reindex(day_order).fillna(0) self.results = { "report_subject": "Day of Week Analysis", "best_day": best_day, "daily_sales_breakdown": {day: f"{self.currency}{amount:.2f}" for day, amount in daily_sales.to_dict().items()} } return json.dumps(self.results, indent=2) def generate_forecast_data(self) -> str: sales_df = self.dfs.get('sales') if sales_df is None or sales_df.empty: return json.dumps({"error": "Not enough sales data to generate a forecast."}) sales_df_copy = sales_df.copy() sales_df_copy.set_index('timestamp', inplace=True) weekly_sales = sales_df_copy['sale_total'].resample('W').sum() if len(weekly_sales) < 2: return json.dumps({"error": "I need at least two weeks of sales data to make a forecast."}) last_week_sales = weekly_sales.iloc[-1] previous_week_sales = weekly_sales.iloc[-2] if len(weekly_sales) > 1 else 0 growth_rate = 0 if previous_week_sales > 0: growth_rate = ((last_week_sales - previous_week_sales) / previous_week_sales) * 100 historical_avg = weekly_sales.head(-1).mean() self.results = { "last_period_sales": f"{self.currency}{last_week_sales:.2f}", "previous_period_sales": f"{self.currency}{previous_week_sales:.2f}", "period_over_period_growth": f"{growth_rate:.2f}%", "historical_average": f"{self.currency}{historical_avg:.2f}" } return json.dumps(self.results, indent=2) def generate_business_snapshot(self) -> Dict[str, Any]: snapshot = {} sales_df = self.dfs.get('sales', pd.DataFrame()) expenses_df = self.dfs.get('expenses', pd.DataFrame()) total_revenue = sales_df['sale_total'].sum() if not sales_df.empty else 0 total_expenses = expenses_df['amount'].sum() if not expenses_df.empty else 0 net_profit = total_revenue - total_expenses snapshot['financial_kpis'] = { "User's Primary Currency": self.currency, "Total Revenue": f"{self.currency}{total_revenue:.2f}", "Total Expenses": f"{self.currency}{total_expenses:.2f}", "Net Profit": f"{self.currency}{net_profit:.2f}" } inventory_df = self.dfs.get('inventory', pd.DataFrame()) if not inventory_df.empty and 'item' in inventory_df.columns and 'quantity' in inventory_df.columns: snapshot['inventory_overview'] = "\n".join( [f"- {row['item']} ({int(row['quantity'])} units)" for _, row in inventory_df.iterrows()] ) else: snapshot['inventory_overview'] = "No inventory items recorded." assets_df = self.dfs.get('assets', pd.DataFrame()) if not assets_df.empty and 'name' in assets_df.columns and 'value' in assets_df.columns: snapshot['asset_register'] = "\n".join( [f"- {row['name']} ({self.currency}{row['value']:.2f})" for _, row in assets_df.iterrows()] ) else: snapshot['asset_register'] = "No assets recorded." liabilities_df = self.dfs.get('liabilities', pd.DataFrame()) if not liabilities_df.empty and 'creditor' in liabilities_df.columns and 'amount' in liabilities_df.columns: snapshot['liabilities_ledger'] = "\n".join( [f"- {row['creditor']} ({self.currency}{row['amount']:.2f})" for _, row in liabilities_df.iterrows()] ) else: snapshot['liabilities_ledger'] = "No liabilities recorded." return snapshot def generateResponse(prompt: str, currency: str = "R") -> str: """Generate structured JSON response from user input using Generative AI.""" if not model: return '{"error": "Model not available"}' # --- UPDATED: System Prompt for Upgrades 1, 2, 3, 5 --- system_prompt = f""" Analyze the user's request for business transaction management. Your goal is to extract structured information about one or more transactions and output it as a valid JSON list. **USER CONTEXT:** - **Default Currency:** {currency} (Use this currency code if no symbol is provided in the input). **1. Output Format:** You MUST output your response as a valid JSON list `[]` containing one or more transaction objects `{{}}`. **2. Transaction Object Structure:** Each transaction object MUST have the following keys: - `"intent"`: The user's goal ("create", "read", "update", "delete", "reset_account"). - `"transaction_type"`: The category of the transaction (e.g., "sale", "purchase", "inventory", "expense", "asset", "liability", "query", "service_offering", "account"). - `"details"`: An object containing key-value pairs extracted from the request. **3. Key Naming Conventions for the `details` Object:** - **For Expenses:** Use `"amount"`, `"description"`, and `"category"`. - **For Assets:** Use `"value"` for the monetary worth and `"name"` for the item's name. - **For Liabilities:** Use `"amount"` and `"creditor"`. - **For Sales/Inventory (Updated):** - Use `"item"`, `"quantity"`, and `"price"`. - **Customer:** If a customer name is mentioned (e.g., "Sold to John"), extract as `"customer"`. - **Payment:** If the user mentions how much was paid (e.g., "They gave me $10" or "paid 500"), extract as `"amount_paid"`. - **For Updates/Deletes (Updated):** - If a Transaction ID is mentioned (e.g., "update transaction 8f3a..."), extract it as `"transaction_id"`. - **For all financial transactions:** - **Currency:** ONLY include a `"currency"` key if the user explicitly types a symbol (e.g., $, R, ZAR, USD). **If the user does not type a symbol, DO NOT include the currency key.** This allows the system to inherit it from inventory. **4. Special Intents:** - **Account Reset:** If the user explicitly asks to "reset my account", "wipe data", or "delete all data", use intent `"reset_account"` and transaction_type `"account"`. **5. Examples:** **Example 1: Sales with Change** - **Input:** "Sold 2 bread for $1 each, customer gave me $5" - **Output:** [ {{"intent": "create", "transaction_type": "sale", "details": {{"item": "bread", "quantity": 2, "price": 1, "amount_paid": 5, "currency": "$"}} }} ] **Example 2: Update with ID** - **Input:** "Update transaction abc1234, change price to 50" - **Output:** [ {{"intent": "update", "transaction_type": "sale", "details": {{"transaction_id": "abc1234", "price": 50}} }} ] **Example 3: Reset Account** - **Input:** "Reset my account completely" - **Output:** [ {{"intent": "reset_account", "transaction_type": "account", "details": {{}} }} ] """ try: full_prompt = [system_prompt, prompt] response = model.generate_content(full_prompt) response_text = response.text cleaned_response = re.sub(r'^```json\s*|\s*```$', '', response_text, flags=re.MULTILINE).strip() if cleaned_response.startswith('{') and cleaned_response.endswith('}'): return f"[{cleaned_response}]" return cleaned_response except Exception as e: logger.error(f"LLM Response generation failed: {e}", exc_info=True) return '{"error": "Failed to process request with LLM"}' def parse_multiple_transactions(response_text: str) -> List[Dict]: """Parse JSON response from LLM into structured transactions.""" try: parsed_data = json.loads(response_text) if isinstance(parsed_data, list): return [add_timestamp(item) for item in parsed_data if isinstance(item, dict)] elif isinstance(parsed_data, dict): return [add_timestamp(parsed_data)] return [] except json.JSONDecodeError: logger.error(f"Failed to decode JSON from LLM response: {response_text}") return [] except Exception as e: logger.error(f"Unexpected error during parsing: {e}", exc_info=True) return [] def add_timestamp(transaction: Dict) -> Dict: """Add created_at timestamp to transaction if not present.""" if 'created_at' not in transaction: transaction['created_at'] = datetime.now(timezone.utc).isoformat() return transaction def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]: """Finds the canonical version of an item using an "exact match first" hybrid approach.""" inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services") name_lower = item_name.lower().strip() if name_lower.endswith('ss'): singular = name_lower else: singular = p.singular_noun(name_lower) if not singular: singular = name_lower all_item_docs = list(inventory_ref.stream()) all_item_names = [doc.id for doc in all_item_docs] # Check exact match if all_item_names: if name_lower in all_item_names: for doc in all_item_docs: if doc.id == name_lower: return {'doc': doc, 'name': name_lower} # Check fuzzy match if all_item_names: best_match = fuzzy_process.extractOne(name_lower, all_item_names) if best_match and best_match[1] >= 90: matched_name = best_match[0] for doc in all_item_docs: if doc.id == matched_name: return {'doc': doc, 'name': matched_name} return {'doc': None, 'name': singular} def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: batch = db.batch() inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services") feedback_messages = [] success_count = 0 for transaction in transaction_data: details = transaction.get('details', {}) item_name = details.get('item') or details.get('service_name') if not item_name: feedback_messages.append("Skipped: An inventory/service update was missing an item or service name.") continue canonical_info = _get_canonical_info(user_phone, item_name) canonical_name = canonical_info['name'] if 'item' in details: details['item'] = canonical_name if 'service_name' in details: details['service_name'] = canonical_name try: change_key = 'quantity' if 'quantity' in details else 'units_available' change_amount = int(details.get(change_key, 0)) except (ValueError, TypeError): feedback_messages.append(f"Skipped '{canonical_name}': Invalid quantity or units format.") continue doc_ref = inventory_ref.document(canonical_name) doc_data = { 'details': {**details, change_key: firestore.Increment(change_amount)}, 'type': 'service' if 'service_name' in details else 'good', 'last_updated': datetime.now(timezone.utc).isoformat(), } batch.set(doc_ref, doc_data, merge=True) feedback_messages.append(f"Processed '{canonical_name}': change of {change_amount}.") success_count += 1 if success_count == 0: return False, "\n".join(feedback_messages) if feedback_messages else "No valid inventory/service items to process." try: batch.commit() return True, "\n".join(feedback_messages) except Exception as e: logger.error(f"Inventory/Service batch commit failed for user {user_phone}: {e}", exc_info=True) return False, f"An error occurred during inventory update: {e}" def create_sale(user_phone: str, transaction_data: List[Dict], force_settled: bool = False) -> tuple[bool, str]: feedback_messages = [] any_success = False # Pre-fetch User Settings for currency fallback user_settings_ref = db.collection("users").document(user_phone).get() user_default_currency = "R" if user_settings_ref.exists: user_default_currency = user_settings_ref.to_dict().get("currency", "R") # FIX ISSUE 1: Define sales_ref at the top of the function # so the process_one_sale transaction function can safely reference it from the enclosing scope. sales_ref = db.collection("users").document(user_phone).collection("sales") for t in transaction_data: details = t.get('details', {}) item_name = details.get('item') or details.get('service_name') if not item_name: feedback_messages.append("Sale failed: Missing item or service name.") continue try: canonical_info = _get_canonical_info(user_phone, item_name) canonical_name = canonical_info['name'] # --- UPGRADE 5: CURRENCY INHERITANCE --- input_currency = details.get('currency') inventory_currency = None last_selling_price = None # Fetch Inventory Doc item_doc_ref = db.collection("users").document(user_phone).collection("inventory_and_services").document(canonical_name) item_snapshot = item_doc_ref.get() if item_snapshot.exists: inv_data = item_snapshot.to_dict() inv_details = inv_data.get('details', {}) inventory_currency = inv_details.get('currency') # If price is missing in input, use inventory price if 'price' not in details: details['price'] = inv_details.get('price', 0) # Look up last sales price if needed if 'price' not in details: # Reference already safely defined at function top all_sales_query = sales_ref.where('details.item', '==', canonical_name) all_sales_docs = list(all_sales_query.stream()) if all_sales_docs: all_sales_docs.sort(key=lambda doc: doc.to_dict().get('timestamp', ''), reverse=True) last_selling_price = all_sales_docs[0].to_dict().get('details', {}).get('price') # Priority: Input > Inventory > User Default final_currency = input_currency or inventory_currency or user_default_currency @firestore.transactional def process_one_sale(transaction, sale_details): is_new_item = canonical_info['doc'] is None original_trans_type = t.get('transaction_type') item_type = 'service' if original_trans_type == 'service_offering' else 'good' user_price = sale_details.get('price') or sale_details.get('unit_price') if user_price is not None: selling_price = float(user_price) elif last_selling_price is not None: selling_price = float(last_selling_price) else: if item_type == 'good' and 'price' not in sale_details: # Use inventory price if available selling_price = float(inv_details.get('price', 0)) if item_snapshot.exists else 0 if selling_price == 0: return f"Sale failed for new item '{canonical_name}': Price not found." else: selling_price = 0 sale_details['price'] = selling_price sale_details['item'] = canonical_name sale_details['currency'] = final_currency # Save explicit currency! if 'unit_price' in sale_details: del sale_details['unit_price'] if 'service_name' in sale_details: del sale_details['service_name'] try: quantity_sold = int(sale_details.get('quantity', 1)) if quantity_sold <= 0: return f"Sale failed for '{canonical_name}': Invalid quantity ({quantity_sold})." except (ValueError, TypeError): return f"Sale failed for '{canonical_name}': Invalid quantity format." # --- UPGRADE 5: Variance (Change/Debt) Logic --- total_due = selling_price * quantity_sold amount_paid = sale_details.get('amount_paid') change_due = 0 amount_outstanding = 0 payment_status = 'completed' # If force_settled is True (from "Settled" button), ignore variance if force_settled: amount_paid = total_due sale_details['amount_paid'] = total_due if amount_paid is not None: try: amount_paid = float(amount_paid) if amount_paid > total_due: change_due = amount_paid - total_due sale_details['change_given'] = change_due payment_status = 'overpayment' # Or 'completed' with change note elif amount_paid < total_due: amount_outstanding = total_due - amount_paid sale_details['amount_outstanding'] = amount_outstanding payment_status = 'partial_payment' else: payment_status = 'completed' except ValueError: pass else: # Default assumption: Exact cash sale if no payment details amount_paid = total_due sale_details['amount_paid'] = total_due sale_details['status'] = payment_status # ------------------------------------- item_cost = 0 if item_snapshot.exists: if inv_data.get('type') == 'good': stock_key = 'quantity' current_stock = int(inv_details.get(stock_key, 0)) if current_stock < quantity_sold: return f"Sale failed for '{canonical_name}': Insufficient stock (Have: {current_stock}, Need: {quantity_sold})." transaction.update(item_doc_ref, {f'details.{stock_key}': firestore.Increment(-quantity_sold)}) item_cost = inv_details.get('price') or inv_details.get('unit_price') or 0 elif item_type == 'good': return f"Sale failed for '{canonical_name}': Item not found in inventory. Please add it first." elif is_new_item and item_type == 'service': logger.info(f"Creating new service '{canonical_name}' during sale.") service_record = { 'details': {'item': canonical_name, 'price': selling_price, 'currency': final_currency}, 'type': 'service', 'last_updated': datetime.now(timezone.utc).isoformat() } transaction.set(item_doc_ref, service_record) sale_doc_ref = sales_ref.document() sale_record = { 'details': {**sale_details, 'cost': item_cost}, 'timestamp': datetime.now(timezone.utc).isoformat(), 'status': payment_status, 'transaction_id': sale_doc_ref.id, 'sale_total': total_due, # Helper for reporting 'customer': sale_details.get('customer', 'Unknown') } transaction.set(sale_doc_ref, sale_record) msg = f"Sale successful for {quantity_sold} x '{canonical_name}' at {final_currency}{selling_price} each." if change_due > 0: msg += f" Change due: {final_currency}{change_due:.2f}." if amount_outstanding > 0: msg += f" Outstanding debt: {final_currency}{amount_outstanding:.2f}." # --- Transaction ID Append --- msg += f"\n\nTransaction ID:\n```{sale_doc_ref.id}```" return msg transaction_feedback = process_one_sale(db.transaction(), details) feedback_messages.append(transaction_feedback) if "successful" in transaction_feedback: any_success = True except Exception as e: logger.error(f"Transactional sale failed for '{item_name}': {e}", exc_info=True) feedback_messages.append(f"Sale failed for '{item_name}': An unexpected error occurred.") return any_success, "\n".join(feedback_messages) def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: batch = db.batch() expenses_ref = db.collection("users").document(user_phone).collection("expenses") success_count = 0 feedback_messages = [] for transaction in transaction_data: details = transaction.get('details', {}) expense_desc = details.get('description', details.get('category', 'Unnamed Expense')) if 'amount' not in details: feedback_messages.append(f"Skipped expense '{expense_desc}': Missing amount.") continue doc_ref = expenses_ref.document() expense_record = { 'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(), 'transaction_type': 'expense', 'status': 'recorded', 'transaction_id': doc_ref.id } batch.set(doc_ref, expense_record) feedback_messages.append(f"Recorded expense: '{expense_desc}' ({details.get('currency','')}{details.get('amount')}).\n\nTransaction ID:\n```{doc_ref.id}```") success_count += 1 if success_count == 0: return False, "\n".join(feedback_messages) if feedback_messages else "No valid expense transactions to create." try: batch.commit() return True, "\n".join(feedback_messages) except Exception as e: logger.error(f"Expense batch commit failed for user {user_phone}: {e}", exc_info=True) return False, f"Failed to record expenses. An error occurred: {e}" def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: batch = db.batch() assets_ref = db.collection("users").document(user_phone).collection("assets") success_count = 0 feedback_messages = [] for transaction in transaction_data: details = transaction.get('details', {}) asset_name = details.get('name', 'Unnamed Asset') if 'value' not in details: feedback_messages.append(f"Skipped asset '{asset_name}': Missing value.") continue doc_ref = assets_ref.document() asset_record = { 'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(), 'transaction_type': 'asset', 'status': 'recorded', 'transaction_id': doc_ref.id } batch.set(doc_ref, asset_record) feedback_messages.append(f"Recorded asset: '{asset_name}' ({details.get('currency','')}{details.get('value')}).\n\nTransaction ID:\n```{doc_ref.id}```") success_count += 1 if success_count == 0: return False, "\n".join(feedback_messages) if feedback_messages else "No valid asset transactions to create." try: batch.commit() return True, "\n".join(feedback_messages) except Exception as e: logger.error(f"Asset batch commit failed for user {user_phone}: {e}", exc_info=True) return False, f"Failed to record assets. An error occurred: {e}" def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: batch = db.batch() liabilities_ref = db.collection("users").document(user_phone).collection("liabilities") success_count = 0 feedback_messages = [] for transaction in transaction_data: details = transaction.get('details', {}) creditor = details.get('creditor', 'Unnamed Creditor') if 'amount' not in details or not details.get('creditor'): feedback_messages.append(f"Skipped liability '{creditor}': Missing amount or creditor.") continue doc_ref = liabilities_ref.document() liability_record = { 'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(), 'transaction_type': 'liability', 'status': 'recorded', 'transaction_id': doc_ref.id } batch.set(doc_ref, liability_record) feedback_messages.append(f"Recorded liability to '{creditor}' ({details.get('currency','')}{details.get('amount')}).\n\nTransaction ID:\n```{doc_ref.id}```") success_count += 1 if success_count == 0: return False, "\n".join(feedback_messages) if feedback_messages else "No valid liability transactions to create." try: batch.commit() return True, "\n".join(feedback_messages) except Exception as e: logger.error(f"Liability batch commit failed for user {user_phone}: {e}", exc_info=True) return False, f"Failed to record liabilities. An error occurred: {e}" def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce', utc=True) numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available', 'sale_total', 'amount_outstanding', 'change_given', 'amount_paid'] for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) for col in df.select_dtypes(include=['object']).columns: df[col] = df[col].fillna('Unknown') return df def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]: all_dfs_with_names = [] inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream() inventory_data, services_data = [], [] for doc in inv_serv_docs: doc_data = doc.to_dict() flat_data = {**doc_data, **doc_data.get('details', {})} if 'details' in flat_data: del flat_data['details'] if doc_data.get('type') == 'service': services_data.append(flat_data) else: inventory_data.append(flat_data) if inventory_data: all_dfs_with_names.append(("inventory", _validate_dataframe(pd.DataFrame(inventory_data)))) if services_data: all_dfs_with_names.append(("services", _validate_dataframe(pd.DataFrame(services_data)))) collections_to_fetch = {'sales': 'sales', 'expenses': 'expenses', 'assets': 'assets', 'liabilities': 'liabilities'} for df_name, coll_name in collections_to_fetch.items(): docs = db.collection("users").document(user_phone).collection(coll_name).stream() data = [doc.to_dict() for doc in docs] if data: flat_data_list = [] for item in data: flat_item = {**item, **item.get('details', {})} if 'details' in flat_item: del flat_item['details'] flat_data_list.append(flat_item) df = pd.DataFrame(flat_data_list) validated_df = _validate_dataframe(df) if df_name == 'sales': if 'price' in validated_df.columns and 'quantity' in validated_df.columns and 'sale_total' not in validated_df.columns: validated_df['sale_total'] = validated_df['price'] * validated_df['quantity'] if 'cost' in validated_df.columns and 'quantity' in validated_df.columns: validated_df['cogs'] = validated_df['cost'] * validated_df['quantity'] if 'timestamp' in validated_df.columns: validated_df['day_of_week'] = validated_df['timestamp'].dt.day_name() all_dfs_with_names.append((df_name, validated_df)) return all_dfs_with_names def _get_relative_date_context() -> str: today = datetime.now(timezone.utc) def fmt(d): return d.strftime('%Y-%m-%d') yesterday = today - timedelta(days=1) start_of_this_week = today - timedelta(days=today.weekday()) start_of_last_week = start_of_this_week - timedelta(days=7) last_monday = start_of_this_week - timedelta(days=7) context = [ f"Here are some pre-calculated dates to help you understand the user's request:", f"- Today is: {fmt(today)}", f"- Yesterday was: {fmt(yesterday)}", f"- The start of this week was: {fmt(start_of_this_week)}", f"- The start of last week was: {fmt(start_of_last_week)}", f"- Last Monday was on: {fmt(last_monday)}", ] return "\n".join(context) def read_datalake(user_phone: str, query: str) -> str: """Master function for reading and analyzing business data.""" def _to_text(resp) -> str: try: if resp is None: return "" if hasattr(resp, "content") and resp.content is not None: return str(resp.content) if hasattr(resp, "text") and resp.text is not None: return str(resp.text) if isinstance(resp, (list, tuple)): return "".join(_to_text(r) for r in resp) return str(resp) except Exception: return str(resp) try: all_dfs_with_names = _fetch_all_collections_as_dfs(user_phone) if not all_dfs_with_names: if any(keyword in query.lower() for keyword in ['help', 'tutorial', 'guide', 'what is', 'explain']): return "I'm Qx, your business assistant! I can help you track sales, expenses, and inventory. Once you add some data, I can give you reports and insights. How can I help you get started?" return "You have no data recorded yet. Please add some transactions first." query_lower = query.lower() engine = ReportEngine(all_dfs_with_names, query) # --- Tier 0: Simple Direct Lookups --- simple_lookup_map = { "inventory": ["stock", "inventory", "in stock", "what do i have"], "assets": ["asset", "assets", "my assets"], "liabilities": ["liabilities", "i owe", "creditor", "my debts"], } for df_name, keywords in simple_lookup_map.items(): if any(keyword in query_lower for keyword in keywords): logger.info(f"Handling '{query}' with Simple Lookup Path for '{df_name}'.") target_df_tuple = next((item for item in all_dfs_with_names if item[0] == df_name), None) if target_df_tuple is not None and not target_df_tuple[1].empty: return render_df_as_image(target_df_tuple[1]) return f"You don't have any {df_name} recorded yet." # --- Tier 1: Canned & Temporal Reports --- report_json = None sales_report_triggers = ["sales report", "sales summary", "sales performance", "how were my sales", "revenue report"] expense_report_triggers = ["expense report", "expense summary", "expense performance", "how were my expenses", "spending report"] debt_report_triggers = ["who owes", "debt", "debtors", "outstanding", "credit sales"] if any(trigger in query_lower for trigger in sales_report_triggers): logger.info(f"Handling '{query}' with the General Sales Report Path.") report_json = engine.generate_report() elif any(trigger in query_lower for trigger in expense_report_triggers): logger.info(f"Handling '{query}' with the General Expense Report Path.") report_json = engine.generate_report() elif any(trigger in query_lower for trigger in debt_report_triggers): logger.info(f"Handling '{query}' with the Debt Report Path.") report_json = engine.generate_debt_report() elif "profit" in query_lower: logger.info(f"Handling '{query}' with the Profit Report Path.") report_json = engine.generate_profit_report() elif any(k in query_lower for k in ["best day", "busiest day", "sales by day"]): logger.info(f"Handling '{query}' with the Day of Week Report Path.") report_json = engine.generate_day_of_week_report() else: item_report_match = re.search(r"(?:report on|performance of)\s+([\w\s]+?)(?:\s+(?:this|last|on|in|for|today|yesterday)|$)", query_lower) if item_report_match: item_name = item_report_match.group(1).strip() if item_name not in ["sales", "expenses", "profit"]: logger.info(f"Handling '{query}' with the Item Report Path for item: '{item_name}'.") report_json = engine.generate_item_report(item_name) if report_json: report_data = json.loads(report_json) if "error" in report_data: return report_data["error"] synthesis_prompt = f""" Directly synthesize a professional business report from the following JSON data. Omit conversational introductions or summaries. Present only the data-driven report, formatted for WhatsApp (*bold*, _italic_, emojis). For sales reports, if helpful, provide a creative and actionable "Insight" section at the end based on the best/worst selling items. **IMPORTANT INSTRUCTIONS:** - If `report_subject` is "Profitability", present a clear financial summary. - If `report_subject` is "Item Report", state the item name and present its performance KPIs. - If `report_subject` is "Debtors List", list the customers and amounts clearly. Here is the data summary: {report_json} """ response = llm.invoke(synthesis_prompt) return _to_text(response) # --- Tier 2: Predictive Queries --- predictive_keywords = ["expect", "forecast", "predict"] if any(keyword in query_lower for keyword in predictive_keywords): logger.info(f"Handling '{query}' with the Forecasting Path.") forecast_json = engine.generate_forecast_data() forecast_data = json.loads(forecast_json) if "error" in forecast_data: return forecast_data["error"] synthesis_prompt = f"Synthesize a sales forecast from the following JSON data. Omit conversational introductions or summaries. Present only the forecast. Data: {forecast_json}" response = llm.invoke(synthesis_prompt) return _to_text(response) # --- Tier 3: Business Coach & Help Layer --- help_keywords = ['help', 'tutorial', 'guide', 'how do you work', 'what can you do', 'how can', 'how would'] literacy_keywords = [ 'explain', 'how to', 'how do i', 'ideas for', 'advice on', "how do", "why", "can I", "can we"] if any(keyword in query_lower for keyword in help_keywords) or any(keyword in query_lower for keyword in literacy_keywords): logger.info(f"Handling '{query}' with the Business Coach Path.") snapshot = engine.generate_business_snapshot() snapshot_str = json.dumps(snapshot, indent=2) synthesis_prompt = f""" You are Qx, a friendly and insightful business coach and financial expert. Your task is to provide a clear, helpful, and strategic answer based on the user's question, using your general business knowledge combined with the business snapshot provided below for context. **Business Snapshot for Context:** {snapshot_str} **User's Question:** "{query}" """ response = llm.invoke(synthesis_prompt) return _to_text(response) # --- Tier 4: Fortified PandasAI with Graceful Fallback (Final Path) --- else: try: logger.info(f"Handling '{query}' with the Fortified PandasAI Path.") schema_description = "You have been provided with these Pandas DataFrames:\n" for name, df in all_dfs_with_names: schema_description += f"* **{name}**: Contains columns like {', '.join(df.columns.to_list())}.\n" date_context = _get_relative_date_context() today_str = datetime.now(timezone.utc).strftime('%Y-%m-%d') pandasai_prompt = ( f"{schema_description}\n" f"For context, today's date is {today_str}.\n{date_context}\n\n" f"**IMPORTANT RULES:**\n" f"1. For time-based queries, you MUST use timezone-aware pandas Timestamps for comparison. Example: `pd.Timestamp('{today_str}', tz='UTC')`.\n" f"2. Your code MUST end by declaring a `result` dictionary.\n" f"3. Your primary goal is to answer with data. Return a DataFrame or a string. ONLY generate a plot if the user explicitly asks for a 'chart', 'plot', 'graph', 'diagram', or 'show'.\n" f"4. When a plot is requested, you MUST save it as a file. The final line of your code must be `result = {{'type': 'plot', 'value': 'filename.png'}}`.\n\n" f"Based on this, please write Python code to answer the following specific user query: '{query}'" ) datalake_dfs = [df for _, df in all_dfs_with_names] lake = SmartDatalake(datalake_dfs, config={"llm": llm, "response_parser": FlaskResponse, "save_charts_path": user_defined_path, "enable_cache": False, "conversational": False}) response = lake.chat(pandasai_prompt) # FIX: Catch both string-based "No code" failures and None/empty responses if not response or "No code found" in str(response): raise NoCodeFoundError("PandasAI failed to generate a valid data answer.") if isinstance(response, dict) and 'value' in response: return str(response['value']) elif isinstance(response, str): return response else: return str(response) except Exception as e: # Catch NoCodeFoundError specifically and all other PandasAI failures to reroute to Coach logger.warning(f"PandasAI path failed for query '{query}': {e}. Falling back to Business Coach Synthesis.") snapshot = engine.generate_business_snapshot() snapshot_str = json.dumps(snapshot, indent=2) synthesis_prompt = f""" You are Qx, a friendly and insightful business coach. I was unable to perform a direct data calculation for this query, so please provide a strategic, high-level business answer based on the snapshot and the question. **Business Snapshot:** {snapshot_str} **User Question:** "{query}" """ response = llm.invoke(synthesis_prompt) return _to_text(response) except Exception as e: logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True) return "Sorry, I encountered an error while analyzing your data." def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]: col_ref = db.collection("users").document(user_phone).collection(collection_name) # --- UPGRADE 1 & 2: Search by Transaction ID --- if 'transaction_id' in details and details['transaction_id']: doc = col_ref.document(details['transaction_id']).get() if doc.exists: return {"id": doc.id, "data": doc.to_dict()} if collection_name in ['inventory_and_services', 'sales'] and ('item' in details or 'service_name' in details): item_name = details.get('item') or details.get('service_name') canonical_info = _get_canonical_info(user_phone, item_name) if canonical_info['doc']: doc = canonical_info['doc'] return {"id": doc.id, "data": doc.to_dict()} query = col_ref key_map = {'expenses': 'description', 'assets': 'name', 'liabilities': 'creditor'} search_key = key_map.get(collection_name) filters_applied = False if search_key and search_key in details: query = query.where(f'details.{search_key}', '==', details[search_key]); filters_applied = True if 'amount' in details: query = query.where('details.amount', '==', details['amount']); filters_applied = True if 'value' in details: query = query.where('details.value', '==', details['value']); filters_applied = True if not filters_applied: return None docs = query.limit(2).stream() found_docs = [{"id": doc.id, "data": doc.to_dict()} for doc in docs] if len(found_docs) == 1: return found_docs[0] elif len(found_docs) > 1: return "multiple_matches" else: return None def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: feedback = [] any_success = False for trans in transaction_data: details = trans.get('details', {}) trans_type = trans.get('transaction_type', '').lower() collection_map = { 'purchase': 'inventory_and_services', 'sale': 'sales', 'inventory': 'inventory_and_services', 'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities' } collection_name = collection_map.get(trans_type) if not collection_name: feedback.append(f"Update skipped: Unknown type '{trans_type}'.") continue target_doc = _find_document_by_details(user_phone, collection_name, details) if target_doc == "multiple_matches": feedback.append(f"Update for {trans_type} failed: Multiple records match. Please use the ID or be more specific.") continue if not target_doc: feedback.append(f"Update for {trans_type} failed: No record found matching your description or ID.") continue doc_id = target_doc["id"] doc_ref = db.collection("users").document(user_phone).collection(collection_name).document(doc_id) try: updates = {f"details.{k}": v for k, v in details.items() if k != 'transaction_id'} if not updates: feedback.append(f"Update for {trans_type} (ID: {doc_id}) skipped: No new data provided.") continue updates['last_updated'] = datetime.now(timezone.utc).isoformat() doc_ref.update(updates) feedback.append(f"Successfully updated {trans_type} record (ID: {doc_id}).") any_success = True except Exception as e: logger.error(f"Update failed for doc '{doc_id}': {e}", exc_info=True) feedback.append(f"Update for {trans_type} (ID: {doc_id}) failed with an error.") return any_success, "\n".join(feedback) def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: feedback = [] any_success = False for trans in transaction_data: details = trans.get('details', {}) trans_type = trans.get('transaction_type', '').lower() collection_map = { 'purchase': 'inventory_and_services', 'sale': 'sales', 'inventory': 'inventory_and_services', 'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities' } collection_name = collection_map.get(trans_type) if not collection_name: feedback.append(f"Delete skipped: Unknown type '{trans_type}'.") continue target_doc = _find_document_by_details(user_phone, collection_name, details) if target_doc == "multiple_matches": feedback.append(f"Delete for {trans_type} failed: Multiple records match. Please use the ID or be more specific.") continue if not target_doc: feedback.append(f"Delete for {trans_type} failed: No record found matching your description or ID.") continue doc_id = target_doc["id"] try: db.collection("users").document(user_phone).collection(collection_name).document(doc_id).delete() feedback.append(f"Successfully deleted {trans_type} record (ID: {doc_id}).") any_success = True except Exception as e: logger.error(f"Delete failed for doc '{doc_id}': {e}", exc_info=True) feedback.append(f"Delete for {trans_type} (ID: {doc_id}) failed with an error.") return any_success, "\n".join(feedback) def reset_user_account(user_phone: str) -> str: """Permanently deletes all transaction data for a user. (Upgrade 3)""" collections = ['sales', 'expenses', 'assets', 'liabilities', 'inventory_and_services', 'temp_transactions'] user_ref = db.collection("users").document(user_phone) try: total_deleted = 0 batch = db.batch() count = 0 for coll_name in collections: docs = user_ref.collection(coll_name).list_documents() for doc in docs: batch.delete(doc) count += 1 if count >= 450: # Firestore batch limit batch.commit() batch = db.batch() count = 0 if count > 0: batch.commit() return "Your account has been reset. All transaction data has been permanently deleted." except Exception as e: logger.error(f"Account reset failed for {user_phone}: {e}", exc_info=True) return "An error occurred while resetting your account. Please try again later." def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool: if not transactions: return False try: doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document("pending") doc_ref.set({"transactions": transactions, "timestamp": datetime.now(timezone.utc).isoformat()}) return True except Exception as e: logger.error(f"Failed to persist temporary transaction for user {mobile}: {e}", exc_info=True) return False def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str: if not transactions: return "No transaction data to display." if isinstance(transactions, dict): transactions = [transactions] output_lines = [] for idx, trans in enumerate(transactions): if not isinstance(trans, dict): continue details = trans.get('details', trans) trans_type = trans.get('transaction_type', 'Item').replace("_", " ").title() title = f"{trans_type}" if len(transactions) > 1: output_lines.append(f"--- {title} {idx + 1} ---") else: output_lines.append(f"--- {title} ---") # --- UPGRADE 1 & 2: Make Transaction ID easy to copy --- if 'transaction_id' in trans: output_lines.append(f"• Transaction ID: ```{trans['transaction_id']}```") key_order = ['item', 'service_name', 'name', 'creditor', 'category', 'quantity', 'units_available', 'hours', 'price', 'rate', 'amount', 'cost', 'value', 'customer', 'amount_paid', 'change_given', 'amount_outstanding', 'vendor', 'client', 'date', 'acquisition_date', 'due_date', 'description', 'type'] displayed_keys = set() displayed_keys.add('transaction_id') for key in key_order: if key in details and key not in displayed_keys: val = details[key] # Format money fields if isinstance(val, (int, float)) and key in ['price', 'amount', 'cost', 'value', 'amount_paid', 'change_given', 'amount_outstanding']: val = f"{details.get('currency', '')}{val:.2f}" output_lines.append(f"• {key.replace('_', ' ').title()}: {val}") displayed_keys.add(key) for key, value in details.items(): if key not in displayed_keys and key != 'currency': output_lines.append(f"• {key.replace('_', ' ').title()}: {value}") output_lines.append("") return "\n".join(output_lines).strip() def fetch_transaction(user_phone: str, identifier: str, collection: str = "inventory_and_services"): try: doc = db.collection("users").document(user_phone).collection(collection).document(identifier).get() if doc.exists: return doc.to_dict() return None except Exception as e: logger.error(f"Error fetching transaction '{identifier}' from '{collection}': {e}", exc_info=True) return None def process_intent(parsed_trans_data: List[Dict], mobile: str, force_settled: bool = False) -> str: """Groups transactions, processes them, and returns feedback. Supports forced settlement.""" if not parsed_trans_data: return "I couldn't understand the transaction details. Could you please try again?" grouped_transactions = {} for trans in parsed_trans_data: intent = trans.get('intent', 'unknown').lower() trans_type = trans.get('transaction_type', 'unknown').lower() key = (intent, trans_type) if key not in grouped_transactions: grouped_transactions[key] = [] grouped_transactions[key].append(trans) final_feedback = [] for (intent, trans_type), transactions in grouped_transactions.items(): logger.info(f"Processing group: {intent} - {trans_type} for user {mobile}") success, message = False, "" try: if intent == 'create': if trans_type == 'sale': success, message = create_sale(mobile, transactions, force_settled=force_settled) final_feedback.append(f"Sales Processing:\n{message}") elif trans_type in ('purchase', 'inventory', 'service_offering'): success, message = create_or_update_inventory_or_service_offering(mobile, transactions) final_feedback.append(f"Inventory/Service Updates:\n{message}") elif trans_type == 'expense': success, message = create_expense(mobile, transactions) final_feedback.append(f"Expense Recording:\n{message}") elif trans_type == 'asset': success, message = create_asset(mobile, transactions) final_feedback.append(f"Asset Recording:\n{message}") elif trans_type == 'liability': success, message = create_liability(mobile, transactions) final_feedback.append(f"Liability Recording:\n{message}") else: final_feedback.append(f"Cannot 'create' transactions of type: '{trans_type}'") elif intent == 'read' or trans_type == 'query': query_details = transactions[0].get('details', {}) query_str = query_details.get('query') if not query_str: query_str = "Show summary of all data" read_result = read_datalake(mobile, query_str) final_feedback.append(read_result) elif intent == 'update': success, message = update_transaction(mobile, transactions) final_feedback.append(f"Update Results:\n{message}") elif intent == 'delete': success, message = delete_transaction(mobile, transactions) final_feedback.append(f"Deletion Results:\n{message}") elif intent == 'reset_account': # --- UPGRADE 3: Reset Account Handler --- message = reset_user_account(mobile) final_feedback.append(message) else: final_feedback.append(f"Unknown intent '{intent}' for type '{trans_type}'.") except Exception as e: logger.error(f"Error processing group ({intent}, {trans_type}) for user {mobile}: {e}", exc_info=True) final_feedback.append(f"An unexpected error occurred while processing your {trans_type} {intent} request.") if not final_feedback: return "No actions were processed from your request." return "\n\n".join(final_feedback).strip()