|
|
import json |
|
|
import os |
|
|
import logging |
|
|
from datetime import datetime, timezone, timedelta |
|
|
from typing import List, Dict, Union, Optional, Any, Tuple |
|
|
from google.cloud import firestore |
|
|
import pandas as pd |
|
|
import inflect |
|
|
from thefuzz import process as fuzzy_process |
|
|
from pandasai import SmartDatalake |
|
|
from pandasai.responses.response_parser import ResponseParser |
|
|
from pandasai.exceptions import NoCodeFoundError, MaliciousQueryError |
|
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
import google.generativeai as genai |
|
|
import re |
|
|
import uuid |
|
|
import dataframe_image as dfi |
|
|
from PIL import Image |
|
|
import io |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
import firebase_admin |
|
|
from firebase_admin import credentials, firestore |
|
|
|
|
|
|
|
|
p = inflect.engine() |
|
|
|
|
|
def init_firestore_from_env(env_var: str = "FIREBASE"): |
|
|
""" |
|
|
Initialise firebase-admin with the service-account JSON that is stored |
|
|
in the given environment variable and return a Firestore client. |
|
|
""" |
|
|
try: |
|
|
if firebase_admin._apps: |
|
|
return firestore.client() |
|
|
sa_json = os.environ[env_var] |
|
|
sa_info = json.loads(sa_json) |
|
|
cred = credentials.Certificate(sa_info) |
|
|
|
|
|
|
|
|
bucket_name = os.environ.get("FIREBASE_STORAGE_BUCKET") |
|
|
firebase_admin.initialize_app(cred, { |
|
|
'storageBucket': bucket_name |
|
|
}) |
|
|
return firestore.client() |
|
|
except KeyError: |
|
|
logging.error("%s environment variable is not set", env_var) |
|
|
raise |
|
|
except (json.JSONDecodeError, ValueError) as e: |
|
|
logging.error("Invalid service-account JSON in %s: %s", env_var, e) |
|
|
raise |
|
|
except Exception as e: |
|
|
logging.exception("Failed to initialise Firestore: %s", e) |
|
|
raise |
|
|
|
|
|
db = init_firestore_from_env() |
|
|
|
|
|
guid = uuid.uuid4() |
|
|
new_filename = f"{guid}" |
|
|
user_defined_path = os.path.join(os.getcwd(), "plots") |
|
|
os.makedirs(user_defined_path, exist_ok=True) |
|
|
|
|
|
|
|
|
def render_df_as_image(df: pd.DataFrame) -> Optional[str]: |
|
|
"""Renders a Pandas DataFrame as an image and returns the file path.""" |
|
|
if df.empty: |
|
|
return "The data requested is empty." |
|
|
try: |
|
|
max_cols = 10 |
|
|
if len(df.columns) > max_cols: |
|
|
df = df.iloc[:, :max_cols] |
|
|
|
|
|
img_path = os.path.join(user_defined_path, f"report_{uuid.uuid4()}.png") |
|
|
dfi.export(df, img_path, table_conversion='matplotlib', dpi=200) |
|
|
return img_path |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to render DataFrame as image: {e}") |
|
|
return "There was an error creating the data table image." |
|
|
|
|
|
|
|
|
class FlaskResponse(ResponseParser): |
|
|
def __init__(self, context) -> None: |
|
|
super().__init__(context) |
|
|
|
|
|
def format_dataframe(self, result): |
|
|
df = result['value'] |
|
|
return render_df_as_image(df) |
|
|
|
|
|
def format_plot(self, result): |
|
|
try: |
|
|
img_path = result['value'] |
|
|
except ValueError: |
|
|
img_path = str(result['value']) |
|
|
return img_path |
|
|
|
|
|
def format_other(self, result): |
|
|
return result['value'] |
|
|
|
|
|
|
|
|
try: |
|
|
genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) |
|
|
GENERATIVE_MODEL_NAME = "gemini-2.0-flash" |
|
|
VISION_MODEL_NAME = "gemini-2.0-flash" |
|
|
|
|
|
|
|
|
model = genai.GenerativeModel( |
|
|
GENERATIVE_MODEL_NAME, |
|
|
generation_config={ |
|
|
"temperature": 0.1, |
|
|
"top_p": 0.9, |
|
|
"top_k": 10, |
|
|
"max_output_tokens": 8192, |
|
|
} |
|
|
) |
|
|
vision_model = genai.GenerativeModel(VISION_MODEL_NAME) |
|
|
llm = ChatGoogleGenerativeAI( |
|
|
model=GENERATIVE_MODEL_NAME, |
|
|
temperature=0.1, |
|
|
convert_system_message_to_human=True |
|
|
) |
|
|
logger.info(f"Using Generative Models: {GENERATIVE_MODEL_NAME} (Text) and {VISION_MODEL_NAME} (Vision)") |
|
|
except KeyError: |
|
|
logger.error("GOOGLE_API_KEY environment variable not set!") |
|
|
model = vision_model = llm = None |
|
|
except Exception as e: |
|
|
logger.error(f"Error configuring Generative AI: {e}", exc_info=True) |
|
|
model = vision_model = llm = None |
|
|
|
|
|
|
|
|
|
|
|
def detect_and_translate_input(text: str) -> Dict[str, str]: |
|
|
""" |
|
|
Detects language. If not English, translates to English preserving numbers/entities. |
|
|
Returns: {'english_text': str, 'detected_lang': str} |
|
|
""" |
|
|
if not model: |
|
|
return {'english_text': text, 'detected_lang': 'English'} |
|
|
|
|
|
system_prompt = """ |
|
|
You are a language detection and translation engine for a bookkeeping bot. |
|
|
1. Detect the language of the user's input (e.g., English, Shona, Zulu, Ndebele, Tswana, etc.). |
|
|
2. CRITICAL: Do NOT be misled by Proper Nouns or Names (e.g., "Chiedza", "Musa", "Farai"). |
|
|
Focus on verbs and sentence structure. If the grammatical structure is English, return "English" as the language regardless of local names used. |
|
|
3. If the language is NOT English, translate the text accurately to English. |
|
|
4. IMPORTANT: Preserve all numbers, currency symbols ($, R, ZIG), and item names exactly. |
|
|
5. If the language IS English, return the text as is. |
|
|
6. Return ONLY a valid JSON object. |
|
|
|
|
|
Output Schema: |
|
|
{ |
|
|
"detected_lang": "Name of Language", |
|
|
"english_text": "The translated text or original text" |
|
|
} |
|
|
""" |
|
|
try: |
|
|
response = model.generate_content([system_prompt, text]) |
|
|
cleaned_response = re.sub(r'^```json\s*|\s*```$', '', response.text, flags=re.MULTILINE).strip() |
|
|
result = json.loads(cleaned_response) |
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"Language detection failed: {e}") |
|
|
return {'english_text': text, 'detected_lang': 'English'} |
|
|
|
|
|
def translate_output(text: str, target_lang: str) -> str: |
|
|
"""Translates the system's English response back to the target language.""" |
|
|
if not model or not target_lang or target_lang.lower() == 'english': |
|
|
return text |
|
|
|
|
|
prompt = f""" |
|
|
Translate the following bookkeeping response from English to {target_lang}. |
|
|
Rules: |
|
|
- Keep numbers, transaction IDs, and currency symbols (e.g., $10.00, R50) UNCHANGED. |
|
|
- Keep formatting like markdown (*bold*) UNCHANGED. |
|
|
- Maintain a professional, helpful tone. |
|
|
|
|
|
Text to translate: |
|
|
"{text}" |
|
|
""" |
|
|
try: |
|
|
response = model.generate_content(prompt) |
|
|
return response.text.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Output translation failed: {e}") |
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _transpile_vision_json_to_query(vision_json: List[Dict]) -> str: |
|
|
if not vision_json: |
|
|
return "Error: Could not extract any transactions from the image." |
|
|
|
|
|
query_parts = [] |
|
|
for trans in vision_json: |
|
|
details = trans.get("details", {}) |
|
|
trans_type = trans.get("transaction_type", "unknown") |
|
|
|
|
|
intent = trans.get("intent", "record") |
|
|
part = f"{intent.title()} a {trans_type}" |
|
|
|
|
|
item = details.get("item") or details.get("name") or details.get("description") or details.get("service_name") |
|
|
quantity = details.get("quantity") |
|
|
price = details.get("price") or details.get("amount") or details.get("value") or details.get("unit_price") |
|
|
currency = details.get("currency", "") |
|
|
vendor = details.get("vendor") or details.get("creditor") |
|
|
|
|
|
if quantity and item: |
|
|
part += f" of {quantity} '{item}'" |
|
|
elif item: |
|
|
part += f" for '{item}'" |
|
|
|
|
|
if price: |
|
|
part += f" for {currency}{price}" |
|
|
|
|
|
if vendor: |
|
|
part += f" from {vendor}" |
|
|
|
|
|
query_parts.append(part) |
|
|
|
|
|
final_query = " and ".join(query_parts) |
|
|
|
|
|
return final_query.strip() |
|
|
|
|
|
def _analyze_image_with_vision(image_bytes: bytes, caption: Optional[str]) -> List[Dict]: |
|
|
"""Sends the image to the Gemini Vision model and returns a structured JSON list of transactions.""" |
|
|
if not vision_model: |
|
|
return [] |
|
|
|
|
|
try: |
|
|
image_pil = Image.open(io.BytesIO(image_bytes)) |
|
|
|
|
|
prompt = f""" |
|
|
You are a bookkeeping vision model. Analyze the image. Return ONLY a valid JSON array [] of transaction objects. |
|
|
|
|
|
**CRITICAL CLASSIFICATION RULES:** |
|
|
1. **RECEIPTS / INVOICES / BILLS:** |
|
|
- These are money LEAVING the business. |
|
|
- Classify as **"expense"** (for utilities, rent, etc.) or **"purchase"** (for buying stock/inventory). |
|
|
- Do NOT classify a receipt as a "sale". |
|
|
|
|
|
2. **PRODUCT PHOTOS / SHELVES:** |
|
|
- If it is a photo of an item with no text/receipt, OR a photo of a customer holding an item. |
|
|
- Classify as **"sale"** (money COMING IN). |
|
|
|
|
|
3. **CATALOGS / MENUS:** |
|
|
- Classify as **"service_offering"** or **"inventory"** (creating items in system). |
|
|
|
|
|
**USER CONTEXT:** |
|
|
{caption if caption else "None"} |
|
|
|
|
|
==================== |
|
|
SCHEMA (each object) |
|
|
==================== |
|
|
{{ |
|
|
"intent": "create" | "read" | "update" | "delete", |
|
|
"transaction_type": "sale" | "purchase" | "expense" | "asset" | "liability" | "inventory" | "service_offering" | "query", |
|
|
"details": {{ /* keys below */ }}, |
|
|
"source": "ocr" | "object_detection" |
|
|
}} |
|
|
|
|
|
================ |
|
|
DETAILS KEYS |
|
|
================ |
|
|
- sale (goods): item, quantity, price, currency |
|
|
- expense: description, amount, currency, vendor |
|
|
- purchase: item, quantity, price, currency |
|
|
- asset: name, value, currency |
|
|
- liability: creditor, amount, currency |
|
|
- query: query (verbatim text) |
|
|
|
|
|
Analyze the provided image and return only the JSON list. |
|
|
""" |
|
|
|
|
|
response = vision_model.generate_content([prompt, image_pil]) |
|
|
response_text = response.text |
|
|
|
|
|
json_str = re.search(r'\[.*\]', response_text, re.DOTALL) |
|
|
if json_str: |
|
|
return json.loads(json_str.group(0)) |
|
|
else: |
|
|
logger.error(f"Vision AI did not return a valid JSON list. Raw response: {response_text}") |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in Vision AI processing: {e}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
def process_image_and_generate_query(image_bytes: bytes, caption: Optional[str]) -> str: |
|
|
"""Master function to process an image and generate a natural language query.""" |
|
|
logger.info("Starting image analysis with Vision AI.") |
|
|
vision_json_list = _analyze_image_with_vision(image_bytes, caption) |
|
|
|
|
|
if not vision_json_list: |
|
|
return "Error: I couldn't find any actionable transactions in the image." |
|
|
|
|
|
logger.info(f"Vision AI analysis complete. Result: {vision_json_list}") |
|
|
return _transpile_vision_json_to_query(vision_json_list) |
|
|
|
|
|
|
|
|
|
|
|
class ReportEngine: |
|
|
def __init__(self, dfs_with_names: List[Tuple[str, pd.DataFrame]], query: str): |
|
|
self.dfs = {name: df for name, df in dfs_with_names} |
|
|
self.query = query.lower() |
|
|
self.now = datetime.now(timezone.utc) |
|
|
self.results = {} |
|
|
self.currency = self._get_user_currency() |
|
|
|
|
|
def _get_user_currency(self) -> str: |
|
|
"""Determines the user's primary currency from their data.""" |
|
|
for df_name in ['sales', 'expenses', 'assets', 'liabilities']: |
|
|
if df_name in self.dfs and 'details' in self.dfs[df_name].columns: |
|
|
currency_series = self.dfs[df_name]['details'].str.get('currency') |
|
|
mode = currency_series.dropna().mode() |
|
|
if not mode.empty: |
|
|
primary_currency = mode[0] |
|
|
if isinstance(primary_currency, str) and primary_currency.strip() and primary_currency != 'Unknown': |
|
|
return primary_currency |
|
|
return "R" |
|
|
|
|
|
def _get_time_filter(self, target_df: pd.DataFrame) -> Optional[pd.Series]: |
|
|
if target_df is None or 'timestamp' not in target_df.columns or target_df.empty: |
|
|
return None |
|
|
|
|
|
if "yesterday" in self.query: |
|
|
yesterday = (self.now - timedelta(days=1)).date() |
|
|
start_of_yesterday = pd.Timestamp(yesterday, tz='UTC') |
|
|
end_of_yesterday = start_of_yesterday + timedelta(days=1) |
|
|
return (target_df['timestamp'] >= start_of_yesterday) & (target_df['timestamp'] < end_of_yesterday) |
|
|
|
|
|
if "today" in self.query: |
|
|
today = self.now.date() |
|
|
start_of_today = pd.Timestamp(today, tz='UTC') |
|
|
end_of_today = start_of_today + timedelta(days=1) |
|
|
return (target_df['timestamp'] >= start_of_today) & (target_df['timestamp'] < end_of_today) |
|
|
|
|
|
if "last month" in self.query: |
|
|
first_day_current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) |
|
|
last_day_last_month = first_day_current_month - timedelta(days=1) |
|
|
first_day_last_month = last_day_last_month.replace(day=1) |
|
|
return (target_df['timestamp'] >= pd.Timestamp(first_day_last_month)) & (target_df['timestamp'] < pd.Timestamp(first_day_current_month)) |
|
|
|
|
|
month_match = re.search(r"\b(january|february|march|april|may|june|july|august|september|october|november|december)\b", self.query) |
|
|
if month_match: |
|
|
month_name = month_match.group(1) |
|
|
month_number = datetime.strptime(month_name.title(), "%B").month |
|
|
current_year = self.now.year |
|
|
return (target_df['timestamp'].dt.month == month_number) & (target_df['timestamp'].dt.year == current_year) |
|
|
|
|
|
if "this month" in self.query or "month" in self.query: |
|
|
return (target_df['timestamp'].dt.month == self.now.month) & (target_df['timestamp'].dt.year == self.now.year) |
|
|
|
|
|
if "last week" in self.query: |
|
|
start_of_this_week = self.now.date() - timedelta(days=self.now.weekday()) |
|
|
start_of_last_week = start_of_this_week - timedelta(days=7) |
|
|
return (target_df['timestamp'].dt.date >= start_of_last_week) & (target_df['timestamp'].dt.date < start_of_this_week) |
|
|
|
|
|
if "this week" in self.query or "week" in self.query: |
|
|
start_of_week = self.now.date() - timedelta(days=self.now.weekday()) |
|
|
return target_df['timestamp'].dt.date >= start_of_week |
|
|
|
|
|
day_match = re.search(r"on (monday|tuesday|wednesday|thursday|friday|saturday|sunday)", self.query) |
|
|
if day_match and 'day_of_week' in target_df.columns: |
|
|
day_name = day_match.group(1).title() |
|
|
return target_df['day_of_week'] == day_name |
|
|
|
|
|
if "year" in self.query: |
|
|
return target_df['timestamp'].dt.year == self.now.year |
|
|
|
|
|
return pd.Series(True, index=target_df.index) |
|
|
|
|
|
def generate_report(self) -> str: |
|
|
"""Generates a simple Sales or Expenses report.""" |
|
|
subject = "sales" |
|
|
expense_triggers = ["expense report", "expense summary", "expenses"] |
|
|
if any(keyword in self.query for keyword in expense_triggers): |
|
|
subject = "expenses" |
|
|
|
|
|
target_df = self.dfs.get(subject, pd.DataFrame()) |
|
|
if target_df.empty: |
|
|
return json.dumps({"error": f"I couldn't find any {subject} data to generate a report."}) |
|
|
|
|
|
time_filter = self._get_time_filter(target_df) |
|
|
if time_filter is not None: |
|
|
target_df = target_df[time_filter] |
|
|
|
|
|
if target_df.empty: |
|
|
return json.dumps({"error": f"No {subject} data found for the specified period."}) |
|
|
|
|
|
if subject == "sales": |
|
|
total_revenue = target_df['sale_total'].sum() |
|
|
num_transactions = len(target_df) |
|
|
item_summary = target_df.groupby('item')['quantity'].sum() |
|
|
|
|
|
if item_summary.empty: |
|
|
best_selling_item = "N/A" |
|
|
worst_selling_item = "N/A" |
|
|
elif len(item_summary) == 1: |
|
|
best_selling_item = item_summary.idxmax() |
|
|
worst_selling_item = "N/A (Only one item type sold)" |
|
|
else: |
|
|
max_val = item_summary.max() |
|
|
min_val = item_summary.min() |
|
|
if max_val == min_val: |
|
|
best_selling_item = "Tie (All items sold equally)" |
|
|
worst_selling_item = "Tie" |
|
|
else: |
|
|
best_selling_item = item_summary.idxmax() |
|
|
worst_selling_item = item_summary.idxmin() |
|
|
|
|
|
self.results = { |
|
|
"report_subject": "Sales", |
|
|
"total_revenue": f"{self.currency}{total_revenue:.2f}", |
|
|
"number_of_sales": num_transactions, |
|
|
"best_selling_item": best_selling_item, |
|
|
"worst_selling_item": worst_selling_item |
|
|
} |
|
|
else: |
|
|
total_expenses = target_df['amount'].sum() |
|
|
num_transactions = len(target_df) |
|
|
category_summary = target_df.groupby('description')['amount'].sum() |
|
|
highest_expense_category = category_summary.idxmax() if not category_summary.empty else "N/A" |
|
|
self.results = { |
|
|
"report_subject": "Expenses", |
|
|
"total_expenses": f"{self.currency}{total_expenses:.2f}", |
|
|
"number_of_expenses": num_transactions, |
|
|
"highest_expense_category": highest_expense_category |
|
|
} |
|
|
|
|
|
return json.dumps(self.results, indent=2) |
|
|
|
|
|
def generate_debt_report(self) -> str: |
|
|
"""Generates a report of outstanding debts (Upgrade 5).""" |
|
|
sales_df = self.dfs.get('sales', pd.DataFrame()) |
|
|
if sales_df.empty: return json.dumps({"error": "No sales data found."}) |
|
|
|
|
|
|
|
|
if 'amount_outstanding' not in sales_df.columns: |
|
|
return json.dumps({"message": "No debt records found."}) |
|
|
|
|
|
debt_df = sales_df[sales_df['amount_outstanding'] > 0] |
|
|
|
|
|
if debt_df.empty: |
|
|
return json.dumps({"message": "Great news! No pending debts found."}) |
|
|
|
|
|
total_outstanding = debt_df['amount_outstanding'].sum() |
|
|
|
|
|
|
|
|
debtors = [] |
|
|
if 'customer' in debt_df.columns: |
|
|
summary = debt_df.groupby('customer')['amount_outstanding'].sum().reset_index() |
|
|
for _, row in summary.iterrows(): |
|
|
debtors.append(f"{row['customer']}: {self.currency}{row['amount_outstanding']:.2f}") |
|
|
else: |
|
|
debtors.append("Various customers (names not recorded)") |
|
|
|
|
|
self.results = { |
|
|
"report_subject": "Debtors List", |
|
|
"total_outstanding": f"{self.currency}{total_outstanding:.2f}", |
|
|
"debtors": debtors |
|
|
} |
|
|
return json.dumps(self.results, indent=2) |
|
|
|
|
|
def generate_profit_report(self) -> str: |
|
|
"""Generates a comprehensive profitability report.""" |
|
|
sales_df = self.dfs.get('sales', pd.DataFrame()) |
|
|
expenses_df = self.dfs.get('expenses', pd.DataFrame()) |
|
|
|
|
|
time_filter_sales = self._get_time_filter(sales_df) |
|
|
time_filter_expenses = self._get_time_filter(expenses_df) |
|
|
|
|
|
filtered_sales = sales_df[time_filter_sales] if time_filter_sales is not None else sales_df |
|
|
filtered_expenses = expenses_df[time_filter_expenses] if time_filter_expenses is not None else expenses_df |
|
|
|
|
|
total_revenue = filtered_sales['sale_total'].sum() if not filtered_sales.empty else 0 |
|
|
total_cogs = filtered_sales['cogs'].sum() if not filtered_sales.empty and 'cogs' in filtered_sales.columns else 0 |
|
|
total_expenses = filtered_expenses['amount'].sum() if not filtered_expenses.empty else 0 |
|
|
|
|
|
gross_profit = total_revenue - total_cogs |
|
|
net_profit = gross_profit - total_expenses |
|
|
|
|
|
num_sales = len(filtered_sales) if not filtered_sales.empty else 0 |
|
|
total_items_sold = filtered_sales['quantity'].sum() if not filtered_sales.empty else 0 |
|
|
atv = total_revenue / num_sales if num_sales > 0 else 0 |
|
|
ipt = total_items_sold / num_sales if num_sales > 0 else 0 |
|
|
expense_ratio = (total_expenses / total_revenue) * 100 if total_revenue > 0 else 0 |
|
|
|
|
|
most_profitable_item = "N/A" |
|
|
if not filtered_sales.empty and 'cogs' in filtered_sales.columns: |
|
|
filtered_sales_copy = filtered_sales.copy() |
|
|
filtered_sales_copy['item_profit'] = filtered_sales_copy['sale_total'] - filtered_sales_copy['cogs'] |
|
|
item_profitability = filtered_sales_copy.groupby('item')['item_profit'].sum() |
|
|
if not item_profitability.empty: |
|
|
most_profitable_item = item_profitability.idxmax() |
|
|
|
|
|
self.results = { |
|
|
"report_subject": "Profitability", |
|
|
"total_revenue": f"{self.currency}{total_revenue:.2f}", "total_cogs": f"{self.currency}{total_cogs:.2f}", |
|
|
"gross_profit": f"{self.currency}{gross_profit:.2f}", "total_expenses": f"{self.currency}{total_expenses:.2f}", |
|
|
"net_profit": f"{self.currency}{net_profit:.2f}", "average_transaction_value": f"{self.currency}{atv:.2f}", |
|
|
"items_per_transaction": f"{ipt:.2f}", "expense_to_revenue_ratio": f"{expense_ratio:.2f}%", |
|
|
"most_profitable_item": most_profitable_item |
|
|
} |
|
|
return json.dumps(self.results, indent=2) |
|
|
|
|
|
def generate_item_report(self, subject_item: str) -> str: |
|
|
"""Generates a performance report for a specific item.""" |
|
|
sales_df = self.dfs.get('sales', pd.DataFrame()) |
|
|
if sales_df.empty: return json.dumps({"error": f"No sales data found for '{subject_item}'."}) |
|
|
|
|
|
item_df = sales_df[sales_df['item'].str.contains(subject_item, case=False, na=False)] |
|
|
if item_df.empty: return json.dumps({"error": f"I couldn't find any sales for '{subject_item}'."}) |
|
|
|
|
|
time_filter = self._get_time_filter(item_df) |
|
|
filtered_df = item_df[time_filter] if time_filter is not None else item_df |
|
|
if filtered_df.empty: return json.dumps({"error": f"No data for '{subject_item}' in this period."}) |
|
|
|
|
|
units_sold = filtered_df['quantity'].sum() |
|
|
total_revenue = filtered_df['sale_total'].sum() |
|
|
total_cogs = filtered_df['cogs'].sum() if 'cogs' in filtered_df.columns else 0 |
|
|
gross_profit = total_revenue - total_cogs |
|
|
profit_margin = (gross_profit / total_revenue) * 100 if total_revenue > 0 else 0 |
|
|
avg_price = total_revenue / units_sold if units_sold > 0 else 0 |
|
|
|
|
|
self.results = { |
|
|
"report_subject": "Item Report", "item_name": subject_item, |
|
|
"units_sold": int(units_sold), "total_revenue": f"{self.currency}{total_revenue:.2f}", |
|
|
"total_cogs": f"{self.currency}{total_cogs:.2f}", "gross_profit": f"{self.currency}{gross_profit:.2f}", |
|
|
"profit_margin": f"{profit_margin:.2f}%", "average_selling_price": f"{self.currency}{avg_price:.2f}" |
|
|
} |
|
|
return json.dumps(self.results, indent=2) |
|
|
|
|
|
def generate_day_of_week_report(self) -> str: |
|
|
sales_df = self.dfs.get('sales', pd.DataFrame()) |
|
|
if sales_df.empty or 'day_of_week' not in sales_df.columns: return json.dumps({"error": "No data available to analyze by day."}) |
|
|
time_filter = self._get_time_filter(sales_df) |
|
|
filtered_df = sales_df[time_filter] if time_filter is not None else sales_df |
|
|
if filtered_df.empty: return json.dumps({"error": "No sales data in this period."}) |
|
|
|
|
|
daily_sales = filtered_df.groupby('day_of_week')['sale_total'].sum() |
|
|
if daily_sales.empty: return json.dumps({"error": "No sales to analyze by day in this period."}) |
|
|
best_day = daily_sales.idxmax() |
|
|
day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] |
|
|
daily_sales = daily_sales.reindex(day_order).fillna(0) |
|
|
|
|
|
self.results = { |
|
|
"report_subject": "Day of Week Analysis", |
|
|
"best_day": best_day, |
|
|
"daily_sales_breakdown": {day: f"{self.currency}{amount:.2f}" for day, amount in daily_sales.to_dict().items()} |
|
|
} |
|
|
return json.dumps(self.results, indent=2) |
|
|
|
|
|
def generate_forecast_data(self) -> str: |
|
|
sales_df = self.dfs.get('sales') |
|
|
if sales_df is None or sales_df.empty: |
|
|
return json.dumps({"error": "Not enough sales data to generate a forecast."}) |
|
|
|
|
|
sales_df_copy = sales_df.copy() |
|
|
sales_df_copy.set_index('timestamp', inplace=True) |
|
|
weekly_sales = sales_df_copy['sale_total'].resample('W').sum() |
|
|
|
|
|
if len(weekly_sales) < 2: |
|
|
return json.dumps({"error": "I need at least two weeks of sales data to make a forecast."}) |
|
|
|
|
|
last_week_sales = weekly_sales.iloc[-1] |
|
|
previous_week_sales = weekly_sales.iloc[-2] if len(weekly_sales) > 1 else 0 |
|
|
|
|
|
growth_rate = 0 |
|
|
if previous_week_sales > 0: |
|
|
growth_rate = ((last_week_sales - previous_week_sales) / previous_week_sales) * 100 |
|
|
|
|
|
historical_avg = weekly_sales.head(-1).mean() |
|
|
|
|
|
self.results = { |
|
|
"last_period_sales": f"{self.currency}{last_week_sales:.2f}", |
|
|
"previous_period_sales": f"{self.currency}{previous_week_sales:.2f}", |
|
|
"period_over_period_growth": f"{growth_rate:.2f}%", |
|
|
"historical_average": f"{self.currency}{historical_avg:.2f}" |
|
|
} |
|
|
return json.dumps(self.results, indent=2) |
|
|
|
|
|
def generate_business_snapshot(self) -> Dict[str, Any]: |
|
|
snapshot = {} |
|
|
sales_df = self.dfs.get('sales', pd.DataFrame()) |
|
|
expenses_df = self.dfs.get('expenses', pd.DataFrame()) |
|
|
total_revenue = sales_df['sale_total'].sum() if not sales_df.empty else 0 |
|
|
total_expenses = expenses_df['amount'].sum() if not expenses_df.empty else 0 |
|
|
net_profit = total_revenue - total_expenses |
|
|
snapshot['financial_kpis'] = { |
|
|
"User's Primary Currency": self.currency, |
|
|
"Total Revenue": f"{self.currency}{total_revenue:.2f}", |
|
|
"Total Expenses": f"{self.currency}{total_expenses:.2f}", |
|
|
"Net Profit": f"{self.currency}{net_profit:.2f}" |
|
|
} |
|
|
|
|
|
inventory_df = self.dfs.get('inventory', pd.DataFrame()) |
|
|
if not inventory_df.empty and 'item' in inventory_df.columns and 'quantity' in inventory_df.columns: |
|
|
snapshot['inventory_overview'] = "\n".join( |
|
|
[f"- {row['item']} ({int(row['quantity'])} units)" for _, row in inventory_df.iterrows()] |
|
|
) |
|
|
else: |
|
|
snapshot['inventory_overview'] = "No inventory items recorded." |
|
|
|
|
|
assets_df = self.dfs.get('assets', pd.DataFrame()) |
|
|
if not assets_df.empty and 'name' in assets_df.columns and 'value' in assets_df.columns: |
|
|
snapshot['asset_register'] = "\n".join( |
|
|
[f"- {row['name']} ({self.currency}{row['value']:.2f})" for _, row in assets_df.iterrows()] |
|
|
) |
|
|
else: |
|
|
snapshot['asset_register'] = "No assets recorded." |
|
|
|
|
|
liabilities_df = self.dfs.get('liabilities', pd.DataFrame()) |
|
|
if not liabilities_df.empty and 'creditor' in liabilities_df.columns and 'amount' in liabilities_df.columns: |
|
|
snapshot['liabilities_ledger'] = "\n".join( |
|
|
[f"- {row['creditor']} ({self.currency}{row['amount']:.2f})" for _, row in liabilities_df.iterrows()] |
|
|
) |
|
|
else: |
|
|
snapshot['liabilities_ledger'] = "No liabilities recorded." |
|
|
|
|
|
return snapshot |
|
|
|
|
|
def generateResponse(prompt: str, currency: str = "R") -> str: |
|
|
"""Generate structured JSON response from user input using Generative AI.""" |
|
|
if not model: |
|
|
return '{"error": "Model not available"}' |
|
|
|
|
|
|
|
|
system_prompt = f""" |
|
|
Analyze the user's request for business transaction management. Your goal is to extract structured information about one or more transactions and output it as a valid JSON list. |
|
|
|
|
|
**USER CONTEXT:** |
|
|
- **Default Currency:** {currency} (Use this currency code if no symbol is provided in the input). |
|
|
|
|
|
**1. Output Format:** |
|
|
You MUST output your response as a valid JSON list `[]` containing one or more transaction objects `{{}}`. |
|
|
|
|
|
**2. Transaction Object Structure:** |
|
|
Each transaction object MUST have the following keys: |
|
|
- `"intent"`: The user's goal ("create", "read", "update", "delete", "reset_account"). |
|
|
- `"transaction_type"`: The category of the transaction (e.g., "sale", "purchase", "inventory", "expense", "asset", "liability", "query", "service_offering", "account"). |
|
|
- `"details"`: An object containing key-value pairs extracted from the request. |
|
|
|
|
|
**3. Key Naming Conventions for the `details` Object:** |
|
|
- **For Expenses:** Use `"amount"`, `"description"`, and `"category"`. |
|
|
- **For Assets:** Use `"value"` for the monetary worth and `"name"` for the item's name. |
|
|
- **For Liabilities:** Use `"amount"` and `"creditor"`. |
|
|
- **For Sales/Inventory (Updated):** |
|
|
- Use `"item"`, `"quantity"`, and `"price"`. |
|
|
- **Customer:** If a customer name is mentioned (e.g., "Sold to John"), extract as `"customer"`. |
|
|
- **Payment:** If the user mentions how much was paid (e.g., "They gave me $10" or "paid 500"), extract as `"amount_paid"`. |
|
|
- **For Updates/Deletes (Updated):** |
|
|
- If a Transaction ID is mentioned (e.g., "update transaction 8f3a..."), extract it as `"transaction_id"`. |
|
|
- **For all financial transactions:** |
|
|
- **Currency:** ONLY include a `"currency"` key if the user explicitly types a symbol (e.g., $, R, ZAR, USD). **If the user does not type a symbol, DO NOT include the currency key.** This allows the system to inherit it from inventory. |
|
|
|
|
|
**4. Special Intents:** |
|
|
- **Account Reset:** If the user explicitly asks to "reset my account", "wipe data", or "delete all data", use intent `"reset_account"` and transaction_type `"account"`. |
|
|
|
|
|
**5. Examples:** |
|
|
|
|
|
**Example 1: Sales with Change** |
|
|
- **Input:** "Sold 2 bread for $1 each, customer gave me $5" |
|
|
- **Output:** [ {{"intent": "create", "transaction_type": "sale", "details": {{"item": "bread", "quantity": 2, "price": 1, "amount_paid": 5, "currency": "$"}} }} ] |
|
|
|
|
|
**Example 2: Update with ID** |
|
|
- **Input:** "Update transaction abc1234, change price to 50" |
|
|
- **Output:** [ {{"intent": "update", "transaction_type": "sale", "details": {{"transaction_id": "abc1234", "price": 50}} }} ] |
|
|
|
|
|
**Example 3: Reset Account** |
|
|
- **Input:** "Reset my account completely" |
|
|
- **Output:** [ {{"intent": "reset_account", "transaction_type": "account", "details": {{}} }} ] |
|
|
""" |
|
|
try: |
|
|
full_prompt = [system_prompt, prompt] |
|
|
response = model.generate_content(full_prompt) |
|
|
response_text = response.text |
|
|
cleaned_response = re.sub(r'^```json\s*|\s*```$', '', response_text, flags=re.MULTILINE).strip() |
|
|
if cleaned_response.startswith('{') and cleaned_response.endswith('}'): |
|
|
return f"[{cleaned_response}]" |
|
|
return cleaned_response |
|
|
except Exception as e: |
|
|
logger.error(f"LLM Response generation failed: {e}", exc_info=True) |
|
|
return '{"error": "Failed to process request with LLM"}' |
|
|
|
|
|
def parse_multiple_transactions(response_text: str) -> List[Dict]: |
|
|
"""Parse JSON response from LLM into structured transactions.""" |
|
|
try: |
|
|
parsed_data = json.loads(response_text) |
|
|
if isinstance(parsed_data, list): |
|
|
return [add_timestamp(item) for item in parsed_data if isinstance(item, dict)] |
|
|
elif isinstance(parsed_data, dict): |
|
|
return [add_timestamp(parsed_data)] |
|
|
return [] |
|
|
except json.JSONDecodeError: |
|
|
logger.error(f"Failed to decode JSON from LLM response: {response_text}") |
|
|
return [] |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error during parsing: {e}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
def add_timestamp(transaction: Dict) -> Dict: |
|
|
"""Add created_at timestamp to transaction if not present.""" |
|
|
if 'created_at' not in transaction: |
|
|
transaction['created_at'] = datetime.now(timezone.utc).isoformat() |
|
|
return transaction |
|
|
|
|
|
def _get_canonical_info(user_phone: str, item_name: str) -> Dict[str, Any]: |
|
|
"""Finds the canonical version of an item using an "exact match first" hybrid approach.""" |
|
|
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services") |
|
|
name_lower = item_name.lower().strip() |
|
|
|
|
|
if name_lower.endswith('ss'): |
|
|
singular = name_lower |
|
|
else: |
|
|
singular = p.singular_noun(name_lower) |
|
|
if not singular: |
|
|
singular = name_lower |
|
|
|
|
|
all_item_docs = list(inventory_ref.stream()) |
|
|
all_item_names = [doc.id for doc in all_item_docs] |
|
|
|
|
|
|
|
|
if all_item_names: |
|
|
if name_lower in all_item_names: |
|
|
for doc in all_item_docs: |
|
|
if doc.id == name_lower: return {'doc': doc, 'name': name_lower} |
|
|
|
|
|
|
|
|
if all_item_names: |
|
|
best_match = fuzzy_process.extractOne(name_lower, all_item_names) |
|
|
if best_match and best_match[1] >= 90: |
|
|
matched_name = best_match[0] |
|
|
for doc in all_item_docs: |
|
|
if doc.id == matched_name: |
|
|
return {'doc': doc, 'name': matched_name} |
|
|
|
|
|
return {'doc': None, 'name': singular} |
|
|
|
|
|
def create_or_update_inventory_or_service_offering(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: |
|
|
batch = db.batch() |
|
|
inventory_ref = db.collection("users").document(user_phone).collection("inventory_and_services") |
|
|
feedback_messages = [] |
|
|
success_count = 0 |
|
|
for transaction in transaction_data: |
|
|
details = transaction.get('details', {}) |
|
|
item_name = details.get('item') or details.get('service_name') |
|
|
if not item_name: |
|
|
feedback_messages.append("Skipped: An inventory/service update was missing an item or service name.") |
|
|
continue |
|
|
canonical_info = _get_canonical_info(user_phone, item_name) |
|
|
canonical_name = canonical_info['name'] |
|
|
if 'item' in details: details['item'] = canonical_name |
|
|
if 'service_name' in details: details['service_name'] = canonical_name |
|
|
try: |
|
|
change_key = 'quantity' if 'quantity' in details else 'units_available' |
|
|
change_amount = int(details.get(change_key, 0)) |
|
|
except (ValueError, TypeError): |
|
|
feedback_messages.append(f"Skipped '{canonical_name}': Invalid quantity or units format.") |
|
|
continue |
|
|
doc_ref = inventory_ref.document(canonical_name) |
|
|
doc_data = { |
|
|
'details': {**details, change_key: firestore.Increment(change_amount)}, |
|
|
'type': 'service' if 'service_name' in details else 'good', |
|
|
'last_updated': datetime.now(timezone.utc).isoformat(), |
|
|
} |
|
|
batch.set(doc_ref, doc_data, merge=True) |
|
|
feedback_messages.append(f"Processed '{canonical_name}': change of {change_amount}.") |
|
|
success_count += 1 |
|
|
if success_count == 0: |
|
|
return False, "\n".join(feedback_messages) if feedback_messages else "No valid inventory/service items to process." |
|
|
try: |
|
|
batch.commit() |
|
|
return True, "\n".join(feedback_messages) |
|
|
except Exception as e: |
|
|
logger.error(f"Inventory/Service batch commit failed for user {user_phone}: {e}", exc_info=True) |
|
|
return False, f"An error occurred during inventory update: {e}" |
|
|
|
|
|
def create_sale(user_phone: str, transaction_data: List[Dict], force_settled: bool = False) -> tuple[bool, str]: |
|
|
feedback_messages = [] |
|
|
any_success = False |
|
|
|
|
|
|
|
|
user_settings_ref = db.collection("users").document(user_phone).get() |
|
|
user_default_currency = "R" |
|
|
if user_settings_ref.exists: |
|
|
user_default_currency = user_settings_ref.to_dict().get("currency", "R") |
|
|
|
|
|
|
|
|
|
|
|
sales_ref = db.collection("users").document(user_phone).collection("sales") |
|
|
|
|
|
for t in transaction_data: |
|
|
details = t.get('details', {}) |
|
|
item_name = details.get('item') or details.get('service_name') |
|
|
if not item_name: |
|
|
feedback_messages.append("Sale failed: Missing item or service name.") |
|
|
continue |
|
|
try: |
|
|
canonical_info = _get_canonical_info(user_phone, item_name) |
|
|
canonical_name = canonical_info['name'] |
|
|
|
|
|
|
|
|
input_currency = details.get('currency') |
|
|
inventory_currency = None |
|
|
last_selling_price = None |
|
|
|
|
|
|
|
|
item_doc_ref = db.collection("users").document(user_phone).collection("inventory_and_services").document(canonical_name) |
|
|
item_snapshot = item_doc_ref.get() |
|
|
|
|
|
if item_snapshot.exists: |
|
|
inv_data = item_snapshot.to_dict() |
|
|
inv_details = inv_data.get('details', {}) |
|
|
inventory_currency = inv_details.get('currency') |
|
|
|
|
|
|
|
|
if 'price' not in details: |
|
|
details['price'] = inv_details.get('price', 0) |
|
|
|
|
|
|
|
|
if 'price' not in details: |
|
|
|
|
|
all_sales_query = sales_ref.where('details.item', '==', canonical_name) |
|
|
all_sales_docs = list(all_sales_query.stream()) |
|
|
if all_sales_docs: |
|
|
all_sales_docs.sort(key=lambda doc: doc.to_dict().get('timestamp', ''), reverse=True) |
|
|
last_selling_price = all_sales_docs[0].to_dict().get('details', {}).get('price') |
|
|
|
|
|
|
|
|
final_currency = input_currency or inventory_currency or user_default_currency |
|
|
|
|
|
@firestore.transactional |
|
|
def process_one_sale(transaction, sale_details): |
|
|
is_new_item = canonical_info['doc'] is None |
|
|
original_trans_type = t.get('transaction_type') |
|
|
item_type = 'service' if original_trans_type == 'service_offering' else 'good' |
|
|
|
|
|
user_price = sale_details.get('price') or sale_details.get('unit_price') |
|
|
if user_price is not None: |
|
|
selling_price = float(user_price) |
|
|
elif last_selling_price is not None: |
|
|
selling_price = float(last_selling_price) |
|
|
else: |
|
|
if item_type == 'good' and 'price' not in sale_details: |
|
|
|
|
|
selling_price = float(inv_details.get('price', 0)) if item_snapshot.exists else 0 |
|
|
if selling_price == 0: |
|
|
return f"Sale failed for new item '{canonical_name}': Price not found." |
|
|
else: |
|
|
selling_price = 0 |
|
|
|
|
|
sale_details['price'] = selling_price |
|
|
sale_details['item'] = canonical_name |
|
|
sale_details['currency'] = final_currency |
|
|
|
|
|
if 'unit_price' in sale_details: del sale_details['unit_price'] |
|
|
if 'service_name' in sale_details: del sale_details['service_name'] |
|
|
|
|
|
try: |
|
|
quantity_sold = int(sale_details.get('quantity', 1)) |
|
|
if quantity_sold <= 0: return f"Sale failed for '{canonical_name}': Invalid quantity ({quantity_sold})." |
|
|
except (ValueError, TypeError): |
|
|
return f"Sale failed for '{canonical_name}': Invalid quantity format." |
|
|
|
|
|
|
|
|
total_due = selling_price * quantity_sold |
|
|
amount_paid = sale_details.get('amount_paid') |
|
|
|
|
|
change_due = 0 |
|
|
amount_outstanding = 0 |
|
|
payment_status = 'completed' |
|
|
|
|
|
|
|
|
if force_settled: |
|
|
amount_paid = total_due |
|
|
sale_details['amount_paid'] = total_due |
|
|
|
|
|
if amount_paid is not None: |
|
|
try: |
|
|
amount_paid = float(amount_paid) |
|
|
if amount_paid > total_due: |
|
|
change_due = amount_paid - total_due |
|
|
sale_details['change_given'] = change_due |
|
|
payment_status = 'overpayment' |
|
|
elif amount_paid < total_due: |
|
|
amount_outstanding = total_due - amount_paid |
|
|
sale_details['amount_outstanding'] = amount_outstanding |
|
|
payment_status = 'partial_payment' |
|
|
else: |
|
|
payment_status = 'completed' |
|
|
except ValueError: |
|
|
pass |
|
|
else: |
|
|
|
|
|
amount_paid = total_due |
|
|
sale_details['amount_paid'] = total_due |
|
|
|
|
|
sale_details['status'] = payment_status |
|
|
|
|
|
|
|
|
item_cost = 0 |
|
|
if item_snapshot.exists: |
|
|
if inv_data.get('type') == 'good': |
|
|
stock_key = 'quantity' |
|
|
current_stock = int(inv_details.get(stock_key, 0)) |
|
|
if current_stock < quantity_sold: |
|
|
return f"Sale failed for '{canonical_name}': Insufficient stock (Have: {current_stock}, Need: {quantity_sold})." |
|
|
transaction.update(item_doc_ref, {f'details.{stock_key}': firestore.Increment(-quantity_sold)}) |
|
|
item_cost = inv_details.get('price') or inv_details.get('unit_price') or 0 |
|
|
elif item_type == 'good': |
|
|
return f"Sale failed for '{canonical_name}': Item not found in inventory. Please add it first." |
|
|
elif is_new_item and item_type == 'service': |
|
|
logger.info(f"Creating new service '{canonical_name}' during sale.") |
|
|
service_record = { |
|
|
'details': {'item': canonical_name, 'price': selling_price, 'currency': final_currency}, |
|
|
'type': 'service', |
|
|
'last_updated': datetime.now(timezone.utc).isoformat() |
|
|
} |
|
|
transaction.set(item_doc_ref, service_record) |
|
|
|
|
|
sale_doc_ref = sales_ref.document() |
|
|
sale_record = { |
|
|
'details': {**sale_details, 'cost': item_cost}, |
|
|
'timestamp': datetime.now(timezone.utc).isoformat(), |
|
|
'status': payment_status, |
|
|
'transaction_id': sale_doc_ref.id, |
|
|
'sale_total': total_due, |
|
|
'customer': sale_details.get('customer', 'Unknown') |
|
|
} |
|
|
transaction.set(sale_doc_ref, sale_record) |
|
|
|
|
|
msg = f"Sale successful for {quantity_sold} x '{canonical_name}' at {final_currency}{selling_price} each." |
|
|
if change_due > 0: |
|
|
msg += f" Change due: {final_currency}{change_due:.2f}." |
|
|
if amount_outstanding > 0: |
|
|
msg += f" Outstanding debt: {final_currency}{amount_outstanding:.2f}." |
|
|
|
|
|
|
|
|
msg += f"\n\nTransaction ID:\n```{sale_doc_ref.id}```" |
|
|
|
|
|
return msg |
|
|
|
|
|
transaction_feedback = process_one_sale(db.transaction(), details) |
|
|
feedback_messages.append(transaction_feedback) |
|
|
if "successful" in transaction_feedback: |
|
|
any_success = True |
|
|
except Exception as e: |
|
|
logger.error(f"Transactional sale failed for '{item_name}': {e}", exc_info=True) |
|
|
feedback_messages.append(f"Sale failed for '{item_name}': An unexpected error occurred.") |
|
|
return any_success, "\n".join(feedback_messages) |
|
|
|
|
|
def create_expense(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: |
|
|
batch = db.batch() |
|
|
expenses_ref = db.collection("users").document(user_phone).collection("expenses") |
|
|
success_count = 0 |
|
|
feedback_messages = [] |
|
|
for transaction in transaction_data: |
|
|
details = transaction.get('details', {}) |
|
|
expense_desc = details.get('description', details.get('category', 'Unnamed Expense')) |
|
|
if 'amount' not in details: |
|
|
feedback_messages.append(f"Skipped expense '{expense_desc}': Missing amount.") |
|
|
continue |
|
|
doc_ref = expenses_ref.document() |
|
|
expense_record = { |
|
|
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(), |
|
|
'transaction_type': 'expense', 'status': 'recorded', 'transaction_id': doc_ref.id |
|
|
} |
|
|
batch.set(doc_ref, expense_record) |
|
|
feedback_messages.append(f"Recorded expense: '{expense_desc}' ({details.get('currency','')}{details.get('amount')}).\n\nTransaction ID:\n```{doc_ref.id}```") |
|
|
success_count += 1 |
|
|
if success_count == 0: |
|
|
return False, "\n".join(feedback_messages) if feedback_messages else "No valid expense transactions to create." |
|
|
try: |
|
|
batch.commit() |
|
|
return True, "\n".join(feedback_messages) |
|
|
except Exception as e: |
|
|
logger.error(f"Expense batch commit failed for user {user_phone}: {e}", exc_info=True) |
|
|
return False, f"Failed to record expenses. An error occurred: {e}" |
|
|
|
|
|
def create_asset(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: |
|
|
batch = db.batch() |
|
|
assets_ref = db.collection("users").document(user_phone).collection("assets") |
|
|
success_count = 0 |
|
|
feedback_messages = [] |
|
|
for transaction in transaction_data: |
|
|
details = transaction.get('details', {}) |
|
|
asset_name = details.get('name', 'Unnamed Asset') |
|
|
if 'value' not in details: |
|
|
feedback_messages.append(f"Skipped asset '{asset_name}': Missing value.") |
|
|
continue |
|
|
doc_ref = assets_ref.document() |
|
|
asset_record = { |
|
|
'details': details, 'timestamp': datetime.now(timezone.utc).isoformat(), |
|
|
'transaction_type': 'asset', 'status': 'recorded', 'transaction_id': doc_ref.id |
|
|
} |
|
|
batch.set(doc_ref, asset_record) |
|
|
feedback_messages.append(f"Recorded asset: '{asset_name}' ({details.get('currency','')}{details.get('value')}).\n\nTransaction ID:\n```{doc_ref.id}```") |
|
|
success_count += 1 |
|
|
if success_count == 0: |
|
|
return False, "\n".join(feedback_messages) if feedback_messages else "No valid asset transactions to create." |
|
|
try: |
|
|
batch.commit() |
|
|
return True, "\n".join(feedback_messages) |
|
|
except Exception as e: |
|
|
logger.error(f"Asset batch commit failed for user {user_phone}: {e}", exc_info=True) |
|
|
return False, f"Failed to record assets. An error occurred: {e}" |
|
|
|
|
|
def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: |
|
|
batch = db.batch() |
|
|
liabilities_ref = db.collection("users").document(user_phone).collection("liabilities") |
|
|
success_count = 0 |
|
|
feedback_messages = [] |
|
|
for transaction in transaction_data: |
|
|
details = transaction.get('details', {}) |
|
|
creditor = details.get('creditor', 'Unnamed Creditor') |
|
|
if 'amount' not in details or not details.get('creditor'): |
|
|
feedback_messages.append(f"Skipped liability '{creditor}': Missing amount or creditor.") |
|
|
continue |
|
|
doc_ref = liabilities_ref.document() |
|
|
liability_record = { |
|
|
'details': details, |
|
|
'timestamp': datetime.now(timezone.utc).isoformat(), |
|
|
'transaction_type': 'liability', |
|
|
'status': 'recorded', |
|
|
'transaction_id': doc_ref.id |
|
|
} |
|
|
batch.set(doc_ref, liability_record) |
|
|
feedback_messages.append(f"Recorded liability to '{creditor}' ({details.get('currency','')}{details.get('amount')}).\n\nTransaction ID:\n```{doc_ref.id}```") |
|
|
success_count += 1 |
|
|
if success_count == 0: |
|
|
return False, "\n".join(feedback_messages) if feedback_messages else "No valid liability transactions to create." |
|
|
try: |
|
|
batch.commit() |
|
|
return True, "\n".join(feedback_messages) |
|
|
except Exception as e: |
|
|
logger.error(f"Liability batch commit failed for user {user_phone}: {e}", exc_info=True) |
|
|
return False, f"Failed to record liabilities. An error occurred: {e}" |
|
|
|
|
|
def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
|
|
if df.empty: return df |
|
|
for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']: |
|
|
if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce', utc=True) |
|
|
numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available', 'sale_total', 'amount_outstanding', 'change_given', 'amount_paid'] |
|
|
for col in numeric_cols: |
|
|
if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) |
|
|
for col in df.select_dtypes(include=['object']).columns: |
|
|
df[col] = df[col].fillna('Unknown') |
|
|
return df |
|
|
|
|
|
def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]: |
|
|
all_dfs_with_names = [] |
|
|
inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream() |
|
|
inventory_data, services_data = [], [] |
|
|
for doc in inv_serv_docs: |
|
|
doc_data = doc.to_dict() |
|
|
flat_data = {**doc_data, **doc_data.get('details', {})} |
|
|
if 'details' in flat_data: del flat_data['details'] |
|
|
if doc_data.get('type') == 'service': services_data.append(flat_data) |
|
|
else: inventory_data.append(flat_data) |
|
|
if inventory_data: all_dfs_with_names.append(("inventory", _validate_dataframe(pd.DataFrame(inventory_data)))) |
|
|
if services_data: all_dfs_with_names.append(("services", _validate_dataframe(pd.DataFrame(services_data)))) |
|
|
collections_to_fetch = {'sales': 'sales', 'expenses': 'expenses', 'assets': 'assets', 'liabilities': 'liabilities'} |
|
|
for df_name, coll_name in collections_to_fetch.items(): |
|
|
docs = db.collection("users").document(user_phone).collection(coll_name).stream() |
|
|
data = [doc.to_dict() for doc in docs] |
|
|
if data: |
|
|
flat_data_list = [] |
|
|
for item in data: |
|
|
flat_item = {**item, **item.get('details', {})} |
|
|
if 'details' in flat_item: del flat_item['details'] |
|
|
flat_data_list.append(flat_item) |
|
|
df = pd.DataFrame(flat_data_list) |
|
|
validated_df = _validate_dataframe(df) |
|
|
if df_name == 'sales': |
|
|
if 'price' in validated_df.columns and 'quantity' in validated_df.columns and 'sale_total' not in validated_df.columns: |
|
|
validated_df['sale_total'] = validated_df['price'] * validated_df['quantity'] |
|
|
if 'cost' in validated_df.columns and 'quantity' in validated_df.columns: |
|
|
validated_df['cogs'] = validated_df['cost'] * validated_df['quantity'] |
|
|
if 'timestamp' in validated_df.columns: |
|
|
validated_df['day_of_week'] = validated_df['timestamp'].dt.day_name() |
|
|
all_dfs_with_names.append((df_name, validated_df)) |
|
|
return all_dfs_with_names |
|
|
|
|
|
def _get_relative_date_context() -> str: |
|
|
today = datetime.now(timezone.utc) |
|
|
def fmt(d): return d.strftime('%Y-%m-%d') |
|
|
yesterday = today - timedelta(days=1) |
|
|
start_of_this_week = today - timedelta(days=today.weekday()) |
|
|
start_of_last_week = start_of_this_week - timedelta(days=7) |
|
|
last_monday = start_of_this_week - timedelta(days=7) |
|
|
context = [ |
|
|
f"Here are some pre-calculated dates to help you understand the user's request:", |
|
|
f"- Today is: {fmt(today)}", |
|
|
f"- Yesterday was: {fmt(yesterday)}", |
|
|
f"- The start of this week was: {fmt(start_of_this_week)}", |
|
|
f"- The start of last week was: {fmt(start_of_last_week)}", |
|
|
f"- Last Monday was on: {fmt(last_monday)}", |
|
|
] |
|
|
return "\n".join(context) |
|
|
|
|
|
def read_datalake(user_phone: str, query: str) -> str: |
|
|
"""Master function for reading and analyzing business data.""" |
|
|
def _to_text(resp) -> str: |
|
|
try: |
|
|
if resp is None: return "" |
|
|
if hasattr(resp, "content") and resp.content is not None: return str(resp.content) |
|
|
if hasattr(resp, "text") and resp.text is not None: return str(resp.text) |
|
|
if isinstance(resp, (list, tuple)): return "".join(_to_text(r) for r in resp) |
|
|
return str(resp) |
|
|
except Exception: return str(resp) |
|
|
|
|
|
try: |
|
|
all_dfs_with_names = _fetch_all_collections_as_dfs(user_phone) |
|
|
if not all_dfs_with_names: |
|
|
if any(keyword in query.lower() for keyword in ['help', 'tutorial', 'guide', 'what is', 'explain']): |
|
|
return "I'm Qx, your business assistant! I can help you track sales, expenses, and inventory. Once you add some data, I can give you reports and insights. How can I help you get started?" |
|
|
return "You have no data recorded yet. Please add some transactions first." |
|
|
|
|
|
query_lower = query.lower() |
|
|
engine = ReportEngine(all_dfs_with_names, query) |
|
|
|
|
|
|
|
|
simple_lookup_map = { |
|
|
"inventory": ["stock", "inventory", "in stock", "what do i have"], |
|
|
"assets": ["asset", "assets", "my assets"], |
|
|
"liabilities": ["liabilities", "i owe", "creditor", "my debts"], |
|
|
} |
|
|
for df_name, keywords in simple_lookup_map.items(): |
|
|
if any(keyword in query_lower for keyword in keywords): |
|
|
logger.info(f"Handling '{query}' with Simple Lookup Path for '{df_name}'.") |
|
|
target_df_tuple = next((item for item in all_dfs_with_names if item[0] == df_name), None) |
|
|
if target_df_tuple is not None and not target_df_tuple[1].empty: |
|
|
return render_df_as_image(target_df_tuple[1]) |
|
|
return f"You don't have any {df_name} recorded yet." |
|
|
|
|
|
|
|
|
report_json = None |
|
|
sales_report_triggers = ["sales report", "sales summary", "sales performance", "how were my sales", "revenue report"] |
|
|
expense_report_triggers = ["expense report", "expense summary", "expense performance", "how were my expenses", "spending report"] |
|
|
debt_report_triggers = ["who owes", "debt", "debtors", "outstanding", "credit sales"] |
|
|
|
|
|
if any(trigger in query_lower for trigger in sales_report_triggers): |
|
|
logger.info(f"Handling '{query}' with the General Sales Report Path.") |
|
|
report_json = engine.generate_report() |
|
|
elif any(trigger in query_lower for trigger in expense_report_triggers): |
|
|
logger.info(f"Handling '{query}' with the General Expense Report Path.") |
|
|
report_json = engine.generate_report() |
|
|
elif any(trigger in query_lower for trigger in debt_report_triggers): |
|
|
logger.info(f"Handling '{query}' with the Debt Report Path.") |
|
|
report_json = engine.generate_debt_report() |
|
|
elif "profit" in query_lower: |
|
|
logger.info(f"Handling '{query}' with the Profit Report Path.") |
|
|
report_json = engine.generate_profit_report() |
|
|
elif any(k in query_lower for k in ["best day", "busiest day", "sales by day"]): |
|
|
logger.info(f"Handling '{query}' with the Day of Week Report Path.") |
|
|
report_json = engine.generate_day_of_week_report() |
|
|
else: |
|
|
item_report_match = re.search(r"(?:report on|performance of)\s+([\w\s]+?)(?:\s+(?:this|last|on|in|for|today|yesterday)|$)", query_lower) |
|
|
if item_report_match: |
|
|
item_name = item_report_match.group(1).strip() |
|
|
if item_name not in ["sales", "expenses", "profit"]: |
|
|
logger.info(f"Handling '{query}' with the Item Report Path for item: '{item_name}'.") |
|
|
report_json = engine.generate_item_report(item_name) |
|
|
|
|
|
if report_json: |
|
|
report_data = json.loads(report_json) |
|
|
if "error" in report_data: return report_data["error"] |
|
|
synthesis_prompt = f""" |
|
|
Directly synthesize a professional business report from the following JSON data. Omit conversational introductions or summaries. Present only the data-driven report, formatted for WhatsApp (*bold*, _italic_, emojis). |
|
|
For sales reports, if helpful, provide a creative and actionable "Insight" section at the end based on the best/worst selling items. |
|
|
|
|
|
**IMPORTANT INSTRUCTIONS:** |
|
|
- If `report_subject` is "Profitability", present a clear financial summary. |
|
|
- If `report_subject` is "Item Report", state the item name and present its performance KPIs. |
|
|
- If `report_subject` is "Debtors List", list the customers and amounts clearly. |
|
|
|
|
|
Here is the data summary: |
|
|
{report_json} |
|
|
""" |
|
|
response = llm.invoke(synthesis_prompt) |
|
|
return _to_text(response) |
|
|
|
|
|
|
|
|
predictive_keywords = ["expect", "forecast", "predict"] |
|
|
if any(keyword in query_lower for keyword in predictive_keywords): |
|
|
logger.info(f"Handling '{query}' with the Forecasting Path.") |
|
|
forecast_json = engine.generate_forecast_data() |
|
|
forecast_data = json.loads(forecast_json) |
|
|
if "error" in forecast_data: return forecast_data["error"] |
|
|
synthesis_prompt = f"Synthesize a sales forecast from the following JSON data. Omit conversational introductions or summaries. Present only the forecast. Data: {forecast_json}" |
|
|
response = llm.invoke(synthesis_prompt) |
|
|
return _to_text(response) |
|
|
|
|
|
|
|
|
help_keywords = ['help', 'tutorial', 'guide', 'how do you work', 'what can you do', 'how can', 'how would'] |
|
|
literacy_keywords = [ 'explain', 'how to', 'how do i', 'ideas for', 'advice on', "how do", "why", "can I", "can we"] |
|
|
if any(keyword in query_lower for keyword in help_keywords) or any(keyword in query_lower for keyword in literacy_keywords): |
|
|
logger.info(f"Handling '{query}' with the Business Coach Path.") |
|
|
snapshot = engine.generate_business_snapshot() |
|
|
snapshot_str = json.dumps(snapshot, indent=2) |
|
|
|
|
|
synthesis_prompt = f""" |
|
|
You are Qx, a friendly and insightful business coach and financial expert. Your task is to provide a clear, helpful, and strategic answer based on the user's question, using your general business knowledge combined with the business snapshot provided below for context. |
|
|
|
|
|
**Business Snapshot for Context:** |
|
|
{snapshot_str} |
|
|
|
|
|
**User's Question:** |
|
|
"{query}" |
|
|
""" |
|
|
response = llm.invoke(synthesis_prompt) |
|
|
return _to_text(response) |
|
|
|
|
|
|
|
|
else: |
|
|
try: |
|
|
logger.info(f"Handling '{query}' with the Fortified PandasAI Path.") |
|
|
|
|
|
schema_description = "You have been provided with these Pandas DataFrames:\n" |
|
|
for name, df in all_dfs_with_names: |
|
|
schema_description += f"* **{name}**: Contains columns like {', '.join(df.columns.to_list())}.\n" |
|
|
|
|
|
date_context = _get_relative_date_context() |
|
|
today_str = datetime.now(timezone.utc).strftime('%Y-%m-%d') |
|
|
|
|
|
pandasai_prompt = ( |
|
|
f"{schema_description}\n" |
|
|
f"For context, today's date is {today_str}.\n{date_context}\n\n" |
|
|
f"**IMPORTANT RULES:**\n" |
|
|
f"1. For time-based queries, you MUST use timezone-aware pandas Timestamps for comparison. Example: `pd.Timestamp('{today_str}', tz='UTC')`.\n" |
|
|
f"2. Your code MUST end by declaring a `result` dictionary.\n" |
|
|
f"3. Your primary goal is to answer with data. Return a DataFrame or a string. ONLY generate a plot if the user explicitly asks for a 'chart', 'plot', 'graph', 'diagram', or 'show'.\n" |
|
|
f"4. When a plot is requested, you MUST save it as a file. The final line of your code must be `result = {{'type': 'plot', 'value': 'filename.png'}}`.\n\n" |
|
|
f"Based on this, please write Python code to answer the following specific user query: '{query}'" |
|
|
) |
|
|
|
|
|
datalake_dfs = [df for _, df in all_dfs_with_names] |
|
|
lake = SmartDatalake(datalake_dfs, config={"llm": llm, "response_parser": FlaskResponse, "save_charts_path": user_defined_path, "enable_cache": False, "conversational": False}) |
|
|
|
|
|
response = lake.chat(pandasai_prompt) |
|
|
|
|
|
|
|
|
if not response or "No code found" in str(response): |
|
|
raise NoCodeFoundError("PandasAI failed to generate a valid data answer.") |
|
|
|
|
|
if isinstance(response, dict) and 'value' in response: |
|
|
return str(response['value']) |
|
|
elif isinstance(response, str): |
|
|
return response |
|
|
else: |
|
|
return str(response) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
logger.warning(f"PandasAI path failed for query '{query}': {e}. Falling back to Business Coach Synthesis.") |
|
|
|
|
|
snapshot = engine.generate_business_snapshot() |
|
|
snapshot_str = json.dumps(snapshot, indent=2) |
|
|
|
|
|
synthesis_prompt = f""" |
|
|
You are Qx, a friendly and insightful business coach. I was unable to perform a direct data calculation for this query, so please provide a strategic, high-level business answer based on the snapshot and the question. |
|
|
|
|
|
**Business Snapshot:** {snapshot_str} |
|
|
**User Question:** "{query}" |
|
|
""" |
|
|
response = llm.invoke(synthesis_prompt) |
|
|
return _to_text(response) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True) |
|
|
return "Sorry, I encountered an error while analyzing your data." |
|
|
|
|
|
|
|
|
def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]: |
|
|
col_ref = db.collection("users").document(user_phone).collection(collection_name) |
|
|
|
|
|
|
|
|
if 'transaction_id' in details and details['transaction_id']: |
|
|
doc = col_ref.document(details['transaction_id']).get() |
|
|
if doc.exists: return {"id": doc.id, "data": doc.to_dict()} |
|
|
|
|
|
if collection_name in ['inventory_and_services', 'sales'] and ('item' in details or 'service_name' in details): |
|
|
item_name = details.get('item') or details.get('service_name') |
|
|
canonical_info = _get_canonical_info(user_phone, item_name) |
|
|
if canonical_info['doc']: |
|
|
doc = canonical_info['doc'] |
|
|
return {"id": doc.id, "data": doc.to_dict()} |
|
|
query = col_ref |
|
|
key_map = {'expenses': 'description', 'assets': 'name', 'liabilities': 'creditor'} |
|
|
search_key = key_map.get(collection_name) |
|
|
filters_applied = False |
|
|
if search_key and search_key in details: |
|
|
query = query.where(f'details.{search_key}', '==', details[search_key]); filters_applied = True |
|
|
if 'amount' in details: |
|
|
query = query.where('details.amount', '==', details['amount']); filters_applied = True |
|
|
if 'value' in details: |
|
|
query = query.where('details.value', '==', details['value']); filters_applied = True |
|
|
if not filters_applied: return None |
|
|
docs = query.limit(2).stream() |
|
|
found_docs = [{"id": doc.id, "data": doc.to_dict()} for doc in docs] |
|
|
if len(found_docs) == 1: return found_docs[0] |
|
|
elif len(found_docs) > 1: return "multiple_matches" |
|
|
else: return None |
|
|
|
|
|
def update_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: |
|
|
feedback = [] |
|
|
any_success = False |
|
|
for trans in transaction_data: |
|
|
details = trans.get('details', {}) |
|
|
trans_type = trans.get('transaction_type', '').lower() |
|
|
collection_map = { |
|
|
'purchase': 'inventory_and_services', 'sale': 'sales', 'inventory': 'inventory_and_services', |
|
|
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities' |
|
|
} |
|
|
collection_name = collection_map.get(trans_type) |
|
|
if not collection_name: |
|
|
feedback.append(f"Update skipped: Unknown type '{trans_type}'.") |
|
|
continue |
|
|
target_doc = _find_document_by_details(user_phone, collection_name, details) |
|
|
if target_doc == "multiple_matches": |
|
|
feedback.append(f"Update for {trans_type} failed: Multiple records match. Please use the ID or be more specific.") |
|
|
continue |
|
|
if not target_doc: |
|
|
feedback.append(f"Update for {trans_type} failed: No record found matching your description or ID.") |
|
|
continue |
|
|
doc_id = target_doc["id"] |
|
|
doc_ref = db.collection("users").document(user_phone).collection(collection_name).document(doc_id) |
|
|
try: |
|
|
updates = {f"details.{k}": v for k, v in details.items() if k != 'transaction_id'} |
|
|
if not updates: |
|
|
feedback.append(f"Update for {trans_type} (ID: {doc_id}) skipped: No new data provided.") |
|
|
continue |
|
|
updates['last_updated'] = datetime.now(timezone.utc).isoformat() |
|
|
doc_ref.update(updates) |
|
|
feedback.append(f"Successfully updated {trans_type} record (ID: {doc_id}).") |
|
|
any_success = True |
|
|
except Exception as e: |
|
|
logger.error(f"Update failed for doc '{doc_id}': {e}", exc_info=True) |
|
|
feedback.append(f"Update for {trans_type} (ID: {doc_id}) failed with an error.") |
|
|
return any_success, "\n".join(feedback) |
|
|
|
|
|
def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> tuple[bool, str]: |
|
|
feedback = [] |
|
|
any_success = False |
|
|
for trans in transaction_data: |
|
|
details = trans.get('details', {}) |
|
|
trans_type = trans.get('transaction_type', '').lower() |
|
|
collection_map = { |
|
|
'purchase': 'inventory_and_services', 'sale': 'sales', 'inventory': 'inventory_and_services', |
|
|
'service_offering': 'inventory_and_services', 'expense': 'expenses', 'asset': 'assets', 'liability': 'liabilities' |
|
|
} |
|
|
collection_name = collection_map.get(trans_type) |
|
|
if not collection_name: |
|
|
feedback.append(f"Delete skipped: Unknown type '{trans_type}'.") |
|
|
continue |
|
|
target_doc = _find_document_by_details(user_phone, collection_name, details) |
|
|
if target_doc == "multiple_matches": |
|
|
feedback.append(f"Delete for {trans_type} failed: Multiple records match. Please use the ID or be more specific.") |
|
|
continue |
|
|
if not target_doc: |
|
|
feedback.append(f"Delete for {trans_type} failed: No record found matching your description or ID.") |
|
|
continue |
|
|
doc_id = target_doc["id"] |
|
|
try: |
|
|
db.collection("users").document(user_phone).collection(collection_name).document(doc_id).delete() |
|
|
feedback.append(f"Successfully deleted {trans_type} record (ID: {doc_id}).") |
|
|
any_success = True |
|
|
except Exception as e: |
|
|
logger.error(f"Delete failed for doc '{doc_id}': {e}", exc_info=True) |
|
|
feedback.append(f"Delete for {trans_type} (ID: {doc_id}) failed with an error.") |
|
|
return any_success, "\n".join(feedback) |
|
|
|
|
|
def reset_user_account(user_phone: str) -> str: |
|
|
"""Permanently deletes all transaction data for a user. (Upgrade 3)""" |
|
|
collections = ['sales', 'expenses', 'assets', 'liabilities', 'inventory_and_services', 'temp_transactions'] |
|
|
user_ref = db.collection("users").document(user_phone) |
|
|
|
|
|
try: |
|
|
total_deleted = 0 |
|
|
batch = db.batch() |
|
|
count = 0 |
|
|
|
|
|
for coll_name in collections: |
|
|
docs = user_ref.collection(coll_name).list_documents() |
|
|
for doc in docs: |
|
|
batch.delete(doc) |
|
|
count += 1 |
|
|
if count >= 450: |
|
|
batch.commit() |
|
|
batch = db.batch() |
|
|
count = 0 |
|
|
|
|
|
if count > 0: |
|
|
batch.commit() |
|
|
|
|
|
return "Your account has been reset. All transaction data has been permanently deleted." |
|
|
except Exception as e: |
|
|
logger.error(f"Account reset failed for {user_phone}: {e}", exc_info=True) |
|
|
return "An error occurred while resetting your account. Please try again later." |
|
|
|
|
|
def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool: |
|
|
if not transactions: return False |
|
|
try: |
|
|
doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document("pending") |
|
|
doc_ref.set({"transactions": transactions, "timestamp": datetime.now(timezone.utc).isoformat()}) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to persist temporary transaction for user {mobile}: {e}", exc_info=True) |
|
|
return False |
|
|
|
|
|
def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str: |
|
|
if not transactions: return "No transaction data to display." |
|
|
if isinstance(transactions, dict): transactions = [transactions] |
|
|
output_lines = [] |
|
|
for idx, trans in enumerate(transactions): |
|
|
if not isinstance(trans, dict): continue |
|
|
details = trans.get('details', trans) |
|
|
trans_type = trans.get('transaction_type', 'Item').replace("_", " ").title() |
|
|
title = f"{trans_type}" |
|
|
if len(transactions) > 1: output_lines.append(f"--- {title} {idx + 1} ---") |
|
|
else: output_lines.append(f"--- {title} ---") |
|
|
|
|
|
|
|
|
if 'transaction_id' in trans: |
|
|
output_lines.append(f"• Transaction ID: ```{trans['transaction_id']}```") |
|
|
|
|
|
key_order = ['item', 'service_name', 'name', 'creditor', 'category', 'quantity', 'units_available', 'hours', 'price', 'rate', 'amount', 'cost', 'value', 'customer', 'amount_paid', 'change_given', 'amount_outstanding', 'vendor', 'client', 'date', 'acquisition_date', 'due_date', 'description', 'type'] |
|
|
|
|
|
displayed_keys = set() |
|
|
displayed_keys.add('transaction_id') |
|
|
|
|
|
for key in key_order: |
|
|
if key in details and key not in displayed_keys: |
|
|
val = details[key] |
|
|
|
|
|
if isinstance(val, (int, float)) and key in ['price', 'amount', 'cost', 'value', 'amount_paid', 'change_given', 'amount_outstanding']: |
|
|
val = f"{details.get('currency', '')}{val:.2f}" |
|
|
output_lines.append(f"• {key.replace('_', ' ').title()}: {val}") |
|
|
displayed_keys.add(key) |
|
|
for key, value in details.items(): |
|
|
if key not in displayed_keys and key != 'currency': |
|
|
output_lines.append(f"• {key.replace('_', ' ').title()}: {value}") |
|
|
output_lines.append("") |
|
|
return "\n".join(output_lines).strip() |
|
|
|
|
|
def fetch_transaction(user_phone: str, identifier: str, collection: str = "inventory_and_services"): |
|
|
try: |
|
|
doc = db.collection("users").document(user_phone).collection(collection).document(identifier).get() |
|
|
if doc.exists: return doc.to_dict() |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching transaction '{identifier}' from '{collection}': {e}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def process_intent(parsed_trans_data: List[Dict], mobile: str, force_settled: bool = False) -> str: |
|
|
"""Groups transactions, processes them, and returns feedback. Supports forced settlement.""" |
|
|
if not parsed_trans_data: |
|
|
return "I couldn't understand the transaction details. Could you please try again?" |
|
|
grouped_transactions = {} |
|
|
for trans in parsed_trans_data: |
|
|
intent = trans.get('intent', 'unknown').lower() |
|
|
trans_type = trans.get('transaction_type', 'unknown').lower() |
|
|
key = (intent, trans_type) |
|
|
if key not in grouped_transactions: grouped_transactions[key] = [] |
|
|
grouped_transactions[key].append(trans) |
|
|
final_feedback = [] |
|
|
for (intent, trans_type), transactions in grouped_transactions.items(): |
|
|
logger.info(f"Processing group: {intent} - {trans_type} for user {mobile}") |
|
|
success, message = False, "" |
|
|
try: |
|
|
if intent == 'create': |
|
|
if trans_type == 'sale': |
|
|
success, message = create_sale(mobile, transactions, force_settled=force_settled) |
|
|
final_feedback.append(f"Sales Processing:\n{message}") |
|
|
elif trans_type in ('purchase', 'inventory', 'service_offering'): |
|
|
success, message = create_or_update_inventory_or_service_offering(mobile, transactions) |
|
|
final_feedback.append(f"Inventory/Service Updates:\n{message}") |
|
|
elif trans_type == 'expense': |
|
|
success, message = create_expense(mobile, transactions) |
|
|
final_feedback.append(f"Expense Recording:\n{message}") |
|
|
elif trans_type == 'asset': |
|
|
success, message = create_asset(mobile, transactions) |
|
|
final_feedback.append(f"Asset Recording:\n{message}") |
|
|
elif trans_type == 'liability': |
|
|
success, message = create_liability(mobile, transactions) |
|
|
final_feedback.append(f"Liability Recording:\n{message}") |
|
|
else: |
|
|
final_feedback.append(f"Cannot 'create' transactions of type: '{trans_type}'") |
|
|
elif intent == 'read' or trans_type == 'query': |
|
|
query_details = transactions[0].get('details', {}) |
|
|
query_str = query_details.get('query') |
|
|
if not query_str: query_str = "Show summary of all data" |
|
|
read_result = read_datalake(mobile, query_str) |
|
|
final_feedback.append(read_result) |
|
|
elif intent == 'update': |
|
|
success, message = update_transaction(mobile, transactions) |
|
|
final_feedback.append(f"Update Results:\n{message}") |
|
|
elif intent == 'delete': |
|
|
success, message = delete_transaction(mobile, transactions) |
|
|
final_feedback.append(f"Deletion Results:\n{message}") |
|
|
elif intent == 'reset_account': |
|
|
|
|
|
message = reset_user_account(mobile) |
|
|
final_feedback.append(message) |
|
|
else: |
|
|
final_feedback.append(f"Unknown intent '{intent}' for type '{trans_type}'.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing group ({intent}, {trans_type}) for user {mobile}: {e}", exc_info=True) |
|
|
final_feedback.append(f"An unexpected error occurred while processing your {trans_type} {intent} request.") |
|
|
if not final_feedback: return "No actions were processed from your request." |
|
|
return "\n\n".join(final_feedback).strip() |