rairo commited on
Commit
f4e47c1
·
verified ·
1 Parent(s): 1b72adb

Update utility.py

Browse files
Files changed (1) hide show
  1. utility.py +200 -411
utility.py CHANGED
@@ -16,8 +16,8 @@ import google.generativeai as genai
16
  import re
17
  import uuid
18
  import dataframe_image as dfi
19
- from PIL import Image # --- ADDED ---
20
- import io # --- ADDED ---
21
 
22
 
23
  logger = logging.getLogger(__name__)
@@ -37,7 +37,7 @@ def init_firestore_from_env(env_var: str = "FIREBASE"):
37
  if firebase_admin._apps:
38
  return firestore.client()
39
  sa_json = os.environ[env_var]
40
- sa_info = json.loads(sa_json)
41
  cred = credentials.Certificate(sa_info)
42
  firebase_admin.initialize_app(cred)
43
  return firestore.client()
@@ -94,12 +94,11 @@ class FlaskResponse(ResponseParser):
94
  def format_other(self, result):
95
  return str(result['value'])
96
 
97
- # --- AI Model Configuration (WITH VISION ADDED) ---
98
  try:
99
  genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
100
  GENERATIVE_MODEL_NAME = "gemini-2.0-flash"
101
  VISION_MODEL_NAME = "gemini-2.0-flash-thinking-exp"
102
-
103
  model = genai.GenerativeModel(
104
  GENERATIVE_MODEL_NAME,
105
  generation_config={
@@ -110,7 +109,6 @@ try:
110
  }
111
  )
112
  vision_model = genai.GenerativeModel(VISION_MODEL_NAME)
113
-
114
  llm = ChatGoogleGenerativeAI(
115
  model=GENERATIVE_MODEL_NAME,
116
  temperature=0.1,
@@ -124,166 +122,128 @@ except Exception as e:
124
  logger.error(f"Error configuring Generative AI: {e}", exc_info=True)
125
  model = vision_model = llm = None
126
 
127
- # --- START: NEW VISION PROCESSING FUNCTIONS ---
128
 
129
- def _transpile_vision_json_to_query(vision_json: dict, caption: Optional[str]) -> str:
130
- """Converts the structured JSON from the Vision AI into a natural language query."""
131
- image_type = vision_json.get("image_type")
132
- data = vision_json.get("data", {})
133
-
134
- if image_type == "products":
135
- items = data.get("items", [])
136
- if not items:
137
- return "Error: No products were identified in the image."
138
 
139
- sale_parts = [f"{item['quantity']} {item['name']}" for item in items]
140
- query = f"Sell {', '.join(sale_parts)}"
141
 
142
- if caption:
143
- # Append caption for potential price adjustments, etc.
144
- query += f" {caption}"
145
- return query.strip()
146
-
147
- elif image_type == "document":
148
- if not data:
149
- return "Error: Could not extract any data from the document."
150
-
151
- # --- Caption Override Logic ---
152
- final_trans_type = data.get("transaction_type", "purchase") # Default to purchase for items
153
- caption_lower = caption.lower() if caption else ""
154
-
155
- if "expense" in caption_lower:
156
- final_trans_type = "expense"
157
- elif "purchase" in caption_lower or "inventory" in caption_lower:
158
- final_trans_type = "purchase"
159
- elif "asset" in caption_lower:
160
- final_trans_type = "asset"
161
- elif "liability" in caption_lower or "i owe" in caption_lower:
162
- final_trans_type = "liability"
163
 
164
- # --- Comprehensive Query Generation ---
165
- query_parts = []
166
- total = data.get("total")
167
- vendor = data.get("vendor")
168
- items = data.get("items", [])
169
- currency = data.get("currency", "")
170
-
171
- if final_trans_type == "expense":
172
- action = "Record an expense"
173
- if total:
174
- item_list_str = ", ".join([f"{item.get('quantity', 1)} {item.get('name')}" for item in items])
175
- details = f"of {currency}{total}"
176
- if vendor:
177
- details += f" from {vendor}"
178
- if item_list_str:
179
- details += f" for {item_list_str}"
180
- query_parts.append(f"{action} {details}")
181
- else: # Handle list of expenses without a total
182
- for item in items:
183
- query_parts.append(f"Record an expense for {item.get('name')} of {currency}{item.get('price')}")
184
-
185
- elif final_trans_type == "purchase":
186
- action = "Record a purchase"
187
- if items:
188
- for item in items:
189
- item_details = f"of {item.get('quantity', 1)} {item.get('name')}"
190
- if item.get('price'):
191
- item_details += f" for {currency}{item.get('price')} each"
192
- if vendor:
193
- item_details += f" from {vendor}"
194
- query_parts.append(f"{action} {item_details}")
195
- elif total: # If only a total is found
196
- query_parts.append(f"{action} of {currency}{total} from {vendor if vendor else 'an unknown vendor'}")
197
-
198
-
199
- elif final_trans_type == "asset":
200
- action = "Record an asset"
201
- # Prefer item names for assets
202
- if items:
203
- for item in items:
204
- asset_name = item.get('name', 'unnamed asset')
205
- value = item.get('price', total)
206
- if value:
207
- query_parts.append(f"{action} named {asset_name} with a value of {currency}{value}")
208
- elif total:
209
- query_parts.append(f"{action} with a value of {currency}{total}")
210
-
211
- elif final_trans_type == "liability":
212
- action = "Record a liability"
213
- creditor = data.get("creditor") or (vendor if vendor else "an unknown creditor")
214
- amount = data.get("amount") or total
215
- if amount:
216
- query_parts.append(f"{action} of {currency}{amount} to {creditor}")
217
-
218
- if not query_parts:
219
- return "Error: The document was recognized but no actionable data could be extracted to form a command."
220
-
221
- # Join multiple commands (e.g., for lists of expenses/purchases) with "and"
222
- return " and ".join(query_parts).strip()
223
-
224
- else:
225
- return "Error: The image was not recognized as products for sale or a financial document."
226
-
227
- def _analyze_image_with_vision(image_bytes: bytes) -> dict:
228
- """Sends the image to the Gemini Vision model and returns structured JSON."""
229
- if not vision_model:
230
- return {"error": "Vision model is not available."}
231
-
232
- try:
233
- image_pil = Image.open(io.BytesIO(image_bytes))
234
 
235
- prompt = """
236
- You are an expert bookkeeping AI. Your task is to analyze an image and return structured JSON.
 
 
237
 
238
- 1. First, classify the image by setting the `image_type` key to one of two values:
239
- * `"products"`: If the image primarily shows physical items for sale (e.g., fruit on a counter, products on a shelf).
240
- * `"document"`: If the image shows a receipt, invoice, bank statement, or a handwritten note/list related to a financial transaction. You MUST be able to read handwriting.
241
 
242
- 2. Second, based on the `image_type`, populate the `data` object:
 
 
 
 
243
 
244
- **If `image_type` is "products":**
245
- The `data` object must contain one key: `"items"`.
246
- - `"items"`: An array of objects, where each object has `"name"` (string) and `"quantity"` (integer). Count each distinct item.
247
 
248
- **If `image_type` is "document":**
249
- The `data` object should contain as many of the following keys as you can find.
250
- - `"transaction_type"`: Infer the type. Use "purchase" for invoices/supplier bills, "liability" for IOUs or loans, "asset" for items of value being logged, and "expense" for general receipts.
251
- - `"total"`: The final total amount (float).
252
- - `"currency"`: The currency symbol or code (e.g., "$", "R").
253
- - `"vendor"`: The name of the store or supplier.
254
- - `"creditor"`: The name of the person or entity owed if it is a liability.
255
- - `"items"`: An array of objects, each with `"name"` (string), `"quantity"` (integer), and `"price"` (float) if available.
256
- - `"date"`: The transaction date (YYYY-MM-DD format).
257
 
258
- **Your final output must be ONLY the raw JSON object, starting with `{` and ending with `}`.**
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259
  """
260
 
261
  response = vision_model.generate_content([prompt, image_pil])
262
  response_text = response.text
263
 
264
- json_str = re.search(r'\{.*\}', response_text, re.DOTALL)
265
  if json_str:
266
  return json.loads(json_str.group(0))
267
  else:
268
- logger.error(f"Vision AI did not return valid JSON. Raw response: {response_text}")
269
- return {"error": "Failed to parse vision response."}
270
 
271
  except Exception as e:
272
  logger.error(f"Error in Vision AI processing: {e}", exc_info=True)
273
- return {"error": "An unexpected error occurred during image analysis."}
274
 
275
  def process_image_and_generate_query(image_bytes: bytes, caption: Optional[str]) -> str:
276
  """Master function to process an image and generate a natural language query."""
277
  logger.info("Starting image analysis with Vision AI.")
278
- vision_json = _analyze_image_with_vision(image_bytes)
279
 
280
- if "error" in vision_json:
281
- return f"Error: {vision_json['error']}"
282
 
283
- logger.info(f"Vision AI analysis complete. Result: {vision_json}")
284
- return _transpile_vision_json_to_query(vision_json, caption)
285
 
286
- # --- END: NEW VISION PROCESSING FUNCTIONS ---
287
 
288
  class ReportEngine:
289
  def __init__(self, dfs_with_names: List[Tuple[str, pd.DataFrame]], query: str):
@@ -295,7 +255,6 @@ class ReportEngine:
295
  def _get_time_filter(self, target_df: pd.DataFrame) -> Optional[pd.Series]:
296
  if 'timestamp' not in target_df.columns:
297
  return None
298
-
299
  today = self.now.date()
300
  if "last month" in self.query:
301
  first_day_current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
@@ -311,82 +270,126 @@ class ReportEngine:
311
  return target_df['timestamp'].dt.date >= start_of_week
312
  if "today" in self.query:
313
  return target_df['timestamp'].dt.date == today
314
- return None
315
 
316
- def generate_report(self) -> str:
317
  subject = "sales"
318
- if "expense" in self.query:
319
- subject = "expenses"
320
-
321
- target_df_name = 'sales' if subject == 'sales' else 'expenses'
322
- target_df = self.dfs.get(target_df_name)
 
 
 
 
 
 
323
 
324
- if target_df is None or target_df.empty:
325
- return json.dumps({"error": f"I couldn't find any data for {subject} to generate a report."})
326
 
327
- time_filter = self._get_time_filter(target_df)
328
- if time_filter is not None:
329
- target_df = target_df[time_filter]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
330
 
331
- if target_df.empty:
332
- return json.dumps({"error": f"No {subject} data found for the specified period."})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
333
 
334
- if subject == "sales":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
335
  total_revenue = target_df['sale_total'].sum()
336
  num_transactions = len(target_df)
337
  item_summary = target_df.groupby('item')['quantity'].sum()
338
  best_selling_item = item_summary.idxmax() if not item_summary.empty else "N/A"
339
  worst_selling_item = item_summary.idxmin() if not item_summary.empty else "N/A"
340
- self.results = {
341
- "report_subject": "Sales",
342
- "total_revenue": f"${total_revenue:.2f}",
343
- "number_of_sales": num_transactions,
344
- "best_selling_item": best_selling_item,
345
- "worst_selling_item": worst_selling_item
346
- }
347
  else: # expenses
 
 
 
 
 
 
348
  total_expenses = target_df['amount'].sum()
349
  num_transactions = len(target_df)
350
  category_summary = target_df.groupby('description')['amount'].sum()
351
  highest_expense_category = category_summary.idxmax() if not category_summary.empty else "N/A"
352
- self.results = {
353
- "report_subject": "Expenses",
354
- "total_expenses": f"${total_expenses:.2f}",
355
- "number_of_expenses": num_transactions,
356
- "highest_expense_category": highest_expense_category
357
- }
358
 
359
  return json.dumps(self.results, indent=2)
360
 
361
- def generate_forecast_data(self) -> str:
362
- sales_df = self.dfs.get('sales')
363
- if sales_df is None or sales_df.empty:
364
- return json.dumps({"error": "Not enough sales data to generate a forecast."})
365
-
366
- sales_df_copy = sales_df.copy()
367
- sales_df_copy.set_index('timestamp', inplace=True)
368
- weekly_sales = sales_df_copy['sale_total'].resample('W').sum()
369
-
370
- if len(weekly_sales) < 2:
371
- return json.dumps({"error": "I need at least two weeks of sales data to make a forecast."})
372
-
373
- last_week_sales = weekly_sales.iloc[-1]
374
- previous_week_sales = weekly_sales.iloc[-2] if len(weekly_sales) > 1 else 0
375
-
376
- growth_rate = 0
377
- if previous_week_sales > 0:
378
- growth_rate = ((last_week_sales - previous_week_sales) / previous_week_sales) * 100
379
-
380
- historical_avg = weekly_sales.head(-1).mean()
381
-
382
- self.results = {
383
- "last_period_sales": f"${last_week_sales:.2f}",
384
- "previous_period_sales": f"${previous_week_sales:.2f}",
385
- "period_over_period_growth": f"{growth_rate:.2f}%",
386
- "historical_average": f"${historical_avg:.2f}"
387
- }
388
- return json.dumps(self.results, indent=2)
389
-
390
  def generateResponse(prompt: str) -> str:
391
  """Generate structured JSON response from user input using Generative AI."""
392
  if not model:
@@ -746,214 +749,6 @@ def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
746
  df[col] = df[col].fillna('Unknown')
747
  return df
748
 
749
- def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]:
750
- """Fetches all user data, splits/validates DataFrames, and engineers features."""
751
- all_dfs_with_names = []
752
- inv_serv_docs = db.collection("users").document(user_phone).collection('inventory_and_services').stream()
753
- inventory_data, services_data = [], []
754
- for doc in inv_serv_docs:
755
- doc_data = doc.to_dict()
756
- flat_data = {**doc_data, **doc_data.get('details', {})}
757
- if 'details' in flat_data: del flat_data['details']
758
- if doc_data.get('type') == 'service': services_data.append(flat_data)
759
- else: inventory_data.append(flat_data)
760
- if inventory_data: all_dfs_with_names.append(("inventory", _validate_dataframe(pd.DataFrame(inventory_data))))
761
- if services_data: all_dfs_with_names.append(("services", _validate_dataframe(pd.DataFrame(services_data))))
762
- collections_to_fetch = {'sales': 'sales', 'expenses': 'expenses', 'assets': 'assets', 'liabilities': 'liabilities'}
763
- for df_name, coll_name in collections_to_fetch.items():
764
- docs = db.collection("users").document(user_phone).collection(coll_name).stream()
765
- data = [doc.to_dict() for doc in docs]
766
- if data:
767
- flat_data_list = []
768
- for item in data:
769
- flat_item = {**item, **item.get('details', {})}
770
- if 'details' in flat_item: del flat_item['details']
771
- flat_data_list.append(flat_item)
772
- df = pd.DataFrame(flat_data_list)
773
- validated_df = _validate_dataframe(df)
774
- if df_name == 'sales' and 'price' in validated_df.columns and 'quantity' in validated_df.columns:
775
- validated_df['sale_total'] = validated_df['price'] * validated_df['quantity']
776
- all_dfs_with_names.append((df_name, validated_df))
777
- return all_dfs_with_names
778
-
779
- def _get_relative_date_context() -> str:
780
- """
781
- Generates a string of pre-calculated dates to inject into the PandasAI prompt
782
- for improved temporal awareness.
783
- """
784
- today = datetime.now(timezone.utc)
785
-
786
- # Helper to format dates
787
- def fmt(d):
788
- return d.strftime('%Y-%m-%d')
789
-
790
- # Basic dates
791
- yesterday = today - timedelta(days=1)
792
-
793
- # Week calculations
794
- start_of_this_week = today - timedelta(days=today.weekday())
795
- end_of_this_week = start_of_this_week + timedelta(days=6)
796
- start_of_last_week = start_of_this_week - timedelta(days=7)
797
- end_of_last_week = start_of_last_week + timedelta(days=6)
798
-
799
- # Last weekday calculations
800
- last_monday = start_of_this_week if today.weekday() != 0 else start_of_last_week
801
- if last_monday >= today:
802
- last_monday -= timedelta(days=7)
803
- last_tuesday = last_monday + timedelta(days=1)
804
- last_wednesday = last_monday + timedelta(days=2)
805
- last_thursday = last_monday + timedelta(days=3)
806
- last_friday = last_monday + timedelta(days=4)
807
- last_saturday = last_monday + timedelta(days=5)
808
- last_sunday = last_monday + timedelta(days=6)
809
-
810
- context = [
811
- f"Here are some pre-calculated dates to help you understand the user's request:",
812
- f"- Today is: {fmt(today)}",
813
- f"- Yesterday was: {fmt(yesterday)}",
814
- f"- The start of this week was: {fmt(start_of_this_week)}",
815
- f"- The end of this week is: {fmt(end_of_this_week)}",
816
- f"- The start of last week was: {fmt(start_of_last_week)}",
817
- f"- The end of last week was: {fmt(end_of_last_week)}",
818
- f"- Last Monday was on: {fmt(last_monday)}",
819
- f"- Last Tuesday was on: {fmt(last_tuesday)}",
820
- f"- Last Wednesday was on: {fmt(last_wednesday)}",
821
- f"- Last Thursday was on: {fmt(last_thursday)}",
822
- f"- Last Friday was on: {fmt(last_friday)}",
823
- f"- Last Saturday was on: {fmt(last_saturday)}",
824
- f"- Last Sunday was on: {fmt(last_sunday)}"
825
- ]
826
-
827
- return "\n".join(context)
828
-
829
-
830
- def read_datalake(user_phone: str, query: str) -> str:
831
- """
832
- Implements the final Unified Strategy for robust, intelligent data analysis.
833
- """
834
- try:
835
- all_dfs_with_names = _fetch_all_collections_as_dfs(user_phone)
836
- if not all_dfs_with_names:
837
- return "You have no data recorded yet. Please add some transactions first."
838
-
839
- query_lower = query.lower()
840
- engine = ReportEngine(all_dfs_with_names, query)
841
-
842
- # --- Tier 0: Simple Direct Lookups (NEW) ---
843
- simple_lookup_map = {
844
- "inventory": ["stock", "inventory", "in stock", "what do i have"],
845
- "assets": ["asset", "assets", "my assets"],
846
- "liabilities": ["liabilities", "i owe", "creditor", "my debts"],
847
- "sales": ["show my sales", "list sales"],
848
- "expenses": ["show my expenses", "list expenses"]
849
- }
850
-
851
- for df_name, keywords in simple_lookup_map.items():
852
- if any(keyword in query_lower for keyword in keywords):
853
- logger.info(f"Handling '{query}' with Simple Lookup Path for '{df_name}'.")
854
- # Find the corresponding dataframe
855
- target_df_tuple = next((item for item in all_dfs_with_names if item[0] == df_name), None)
856
- if target_df_tuple is not None:
857
- target_df = target_df_tuple[1]
858
- if not target_df.empty:
859
- # Return the dataframe rendered as an image
860
- return render_df_as_image(target_df)
861
- else:
862
- return f"You don't have any {df_name} recorded yet."
863
- else:
864
- return f"I couldn't find any data for {df_name}."
865
-
866
- # --- Tier 1: Intelligent Router (Existing) ---
867
- predictive_keywords = ["expect", "forecast", "predict", "next month", "next week"]
868
- historical_report_keywords = ["report", "summary", "performance", "how did i do", "overview", "month", "year", "week", "today"]
869
-
870
- if any(keyword in query_lower for keyword in predictive_keywords):
871
- # --- Path A: Forecasting ---
872
- logger.info(f"Handling '{query}' with the Forecasting Path.")
873
- forecast_json = engine.generate_forecast_data()
874
- forecast_data = json.loads(forecast_json)
875
- if "error" in forecast_data: return forecast_data["error"]
876
-
877
- synthesis_prompt = f"""
878
- You are a business analyst making a simple projection. Based on the following data, provide a friendly sales forecast.
879
- Acknowledge this is an estimate based on past performance. Format your response for WhatsApp (*bold*, _italic_).
880
-
881
- Data:
882
- {forecast_json}
883
- """
884
- response = llm.invoke(synthesis_prompt)
885
- return response.content
886
-
887
- elif any(keyword in query_lower for keyword in historical_report_keywords):
888
- # --- Path B: Historical Reporting with Creative Insights ---
889
- logger.info(f"Handling '{query}' with the Reporting Path.")
890
- report_json = engine.generate_report()
891
- report_data = json.loads(report_json)
892
- if "error" in report_data: return report_data["error"]
893
-
894
- synthesis_prompt = f"""
895
- You are a helpful business assistant. Based on the following JSON data summary, synthesize a concise, friendly report for the user.
896
- The user's original request was: '{query}'. Format your response using WhatsApp-compatible markdown (*bold*, _italic_).
897
-
898
- Your most important task is to provide a creative and actionable "Insight" at the end. Use the product data provided to give specific advice.
899
- - For the best-selling item, suggest a complementary product or a "double-down" strategy. (e.g., "Bananas are a hit! Consider adding banana bread.")
900
- - For the worst-selling item, suggest a promotion or bundle deal. (e.g., "To boost Apple sales, try a 'Fruit Duo' bundle with your best-selling Bananas.")
901
-
902
- Here is the data summary:
903
- {report_json}
904
- """
905
- response = llm.invoke(synthesis_prompt)
906
- return response.content
907
-
908
- else:
909
- # --- Path C: Fortified PandasAI for Q&A and Plotting ---
910
- logger.info(f"Handling '{query}' with the Fortified PandasAI Path.")
911
-
912
- schema_description = "You have been provided with these Pandas DataFrames:\n"
913
- for name, df in all_dfs_with_names:
914
- schema_description += f"* **{name}**: Contains columns like {', '.join(df.columns.to_list())}.\n"
915
-
916
- # NEW: Injecting temporal context
917
- date_context = _get_relative_date_context()
918
- today_str = datetime.now(timezone.utc).strftime('%Y-%m-%d')
919
-
920
- pandasai_prompt = (
921
- f"{schema_description}\n"
922
- f"For context, today's date is {today_str}.\n"
923
- f"{date_context}\n\n" # Injecting the new context here
924
- f"IMPORTANT:\n"
925
- f"1. For any time-based queries, you MUST use timezone-aware pandas Timestamps for comparison. Example: `pd.Timestamp('{today_str}', tz='UTC')`.\n"
926
- f"2. When a plot or chart is requested, you MUST save it as a file. The final line of your code must be `result = {{'type': 'plot', 'value': 'filename.png'}}`.\n"
927
- f"3. Your code MUST end by declaring a `result` dictionary.\n\n"
928
- f"Based on this, please write Python code to answer the following specific user query: '{query}'"
929
- )
930
-
931
- datalake_dfs = [df for _, df in all_dfs_with_names]
932
- lake = SmartDatalake(
933
- datalake_dfs,
934
- config={
935
- "llm": llm,
936
- "response_parser": FlaskResponse,
937
- "save_charts_path": user_defined_path,
938
- "enable_cache": False,
939
- "conversational": False, # Set to False for single-turn queries
940
- "custom_whitelisted_dependencies": [
941
- "os", "io", "sys", "glob", "collections", "matplotlib", "seaborn",
942
- "numpy", "scipy", "statsmodels", "sklearn"
943
- ],
944
- "security": "none"
945
- }
946
- )
947
- response = lake.chat(pandasai_prompt)
948
- return str(response)
949
-
950
- except (NoCodeFoundError, MaliciousQueryError) as e:
951
- logger.error(f"PandasAI failed for query '{query}': {e}")
952
- return f"Unfortunately, I was not able to answer your question, because of the following error:\n\n{e}"
953
- except Exception as e:
954
- logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
955
- return "Sorry, I encountered an error while analyzing your data."
956
-
957
  def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
958
  col_ref = db.collection("users").document(user_phone).collection(collection_name)
959
  if 'transaction_id' in details and details['transaction_id']:
@@ -1060,8 +855,6 @@ def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool
1060
  logger.error(f"Failed to persist temporary transaction for user {mobile}: {e}", exc_info=True)
1061
  return False
1062
 
1063
- CURRENCY_SYMBOL_REGEX = re.compile(r"^\s*[\$\£\€\¥\₹R]")
1064
-
1065
  def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str:
1066
  if not transactions: return "No transaction data to display."
1067
  if isinstance(transactions, dict): transactions = [transactions]
@@ -1073,11 +866,7 @@ def format_transaction_response(transactions: Union[List[Dict], Dict, None]) ->
1073
  title = f"{trans_type}"
1074
  if len(transactions) > 1: output_lines.append(f"--- {title} {idx + 1} ---")
1075
  else: output_lines.append(f"--- {title} ---")
1076
- key_order = [
1077
- 'transaction_id', 'item', 'service_name', 'name', 'creditor', 'category',
1078
- 'quantity', 'units_available', 'hours', 'price', 'rate', 'amount', 'cost', 'value',
1079
- 'customer', 'vendor', 'client', 'date', 'acquisition_date', 'due_date', 'description', 'type'
1080
- ]
1081
  displayed_keys = set()
1082
  if 'transaction_id' in trans:
1083
  output_lines.append(f"• Transaction ID: {trans['transaction_id']}")
 
16
  import re
17
  import uuid
18
  import dataframe_image as dfi
19
+ from PIL import Image
20
+ import io
21
 
22
 
23
  logger = logging.getLogger(__name__)
 
37
  if firebase_admin._apps:
38
  return firestore.client()
39
  sa_json = os.environ[env_var]
40
+ sa_info = json.loads(sa_info)
41
  cred = credentials.Certificate(sa_info)
42
  firebase_admin.initialize_app(cred)
43
  return firestore.client()
 
94
  def format_other(self, result):
95
  return str(result['value'])
96
 
97
+ # --- AI Model Configuration ---
98
  try:
99
  genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
100
  GENERATIVE_MODEL_NAME = "gemini-2.0-flash"
101
  VISION_MODEL_NAME = "gemini-2.0-flash-thinking-exp"
 
102
  model = genai.GenerativeModel(
103
  GENERATIVE_MODEL_NAME,
104
  generation_config={
 
109
  }
110
  )
111
  vision_model = genai.GenerativeModel(VISION_MODEL_NAME)
 
112
  llm = ChatGoogleGenerativeAI(
113
  model=GENERATIVE_MODEL_NAME,
114
  temperature=0.1,
 
122
  logger.error(f"Error configuring Generative AI: {e}", exc_info=True)
123
  model = vision_model = llm = None
124
 
125
+ # --- START: VISION PROCESSING FUNCTIONS (REVISED) ---
126
 
127
+ def _transpile_vision_json_to_query(vision_json: List[Dict], caption: Optional[str]) -> str:
128
+ """Converts the structured JSON list from the Vision AI into a natural language query."""
129
+ if not vision_json:
130
+ return "Error: Could not extract any transactions from the image."
131
+
132
+ query_parts = []
133
+ for trans in vision_json:
134
+ details = trans.get("details", {})
135
+ trans_type = trans.get("transaction_type", "unknown")
136
 
137
+ # Build a descriptive string for each transaction
138
+ part = f"Record a {trans_type}"
139
 
140
+ item = details.get("item") or details.get("name") or details.get("description")
141
+ quantity = details.get("quantity")
142
+ price = details.get("price") or details.get("amount") or details.get("value")
143
+ currency = details.get("currency", "")
144
+ vendor = details.get("vendor") or details.get("creditor")
145
+
146
+ if quantity and item:
147
+ part += f" of {quantity} {item}"
148
+ elif item:
149
+ part += f" for {item}"
 
 
 
 
 
 
 
 
 
 
 
150
 
151
+ if price:
152
+ part += f" for {currency}{price}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
 
154
+ if vendor:
155
+ part += f" from {vendor}"
156
+
157
+ query_parts.append(part)
158
 
159
+ # Join multiple commands together
160
+ final_query = " and ".join(query_parts)
 
161
 
162
+ # Allow caption to provide additional context, like a price for a sale
163
+ if caption:
164
+ final_query += f" {caption}"
165
+
166
+ return final_query.strip()
167
 
 
 
 
168
 
169
+ def _analyze_image_with_vision(image_bytes: bytes) -> List[Dict]:
170
+ """Sends the image to the Gemini Vision model and returns a structured JSON list of transactions."""
171
+ if not vision_model:
172
+ return [{"error": "Vision model is not available."}]
 
 
 
 
 
173
 
174
+ try:
175
+ image_pil = Image.open(io.BytesIO(image_bytes))
176
+
177
+ # This prompt is now aligned with the text-based `generateResponse` prompt
178
+ prompt = """
179
+ You are an expert bookkeeping AI. Your task is to analyze an image (which could be a receipt, invoice, or handwritten note) and extract all financial transactions.
180
+
181
+ **1. Output Format:**
182
+ You MUST output your response as a valid JSON list `[]` containing one or more transaction objects `{}`.
183
+
184
+ **2. Transaction Object Structure:**
185
+ For each distinct transaction you identify, create a JSON object with the following keys:
186
+ - `"intent"`: The user's goal. For images, this should always be "create".
187
+ - `"transaction_type"`: The category of the transaction. Infer this from keywords.
188
+ - `"details"`: An object containing key-value pairs extracted from the image.
189
+
190
+ **3. Inference Rules & Keywords:**
191
+ - **`"purchase"`**: Use this for keywords like "bought", "buy", "purchase", or for inventory items on a receipt.
192
+ - **`"sale"`**: Use this for keywords like "sold", "sell", or "sale".
193
+ - **`"expense"`**: Use this for payments for services or non-inventory goods (e.g., 'Tella Football Club', 'fuel', 'lunch', 'tickets'). If you are unsure, default to 'expense'.
194
+ - **Handwriting**: You MUST be able to read handwriting. Analyze each line of a handwritten note as a potential separate transaction.
195
+
196
+ **4. `details` Object Structure:**
197
+ - For `purchase`/`sale`: Use `"item"` and `"quantity"`.
198
+ - For `expense`: Use `"description"`, `"amount"`, and `"currency"`. If a vendor is clear, add `"vendor"`.
199
+
200
+ **5. Examples:**
201
+
202
+ **Example 1: Handwritten Note**
203
+ - **Image Content:** A note that says "bought 10 Oranges", "sold 5 oranges", "bought 5 lemons".
204
+ - **Output:**
205
+ [
206
+ {"intent": "create", "transaction_type": "purchase", "details": {"item": "Oranges", "quantity": 10}},
207
+ {"intent": "create", "transaction_type": "sale", "details": {"item": "oranges", "quantity": 5}},
208
+ {"intent": "create", "transaction_type": "purchase", "details": {"item": "lemons", "quantity": 5}}
209
+ ]
210
+
211
+ **Example 2: Expense Receipt**
212
+ - **Image Content:** A receipt from "TELLA FOOTBALL CLUB" for "R900.00".
213
+ - **Output:**
214
+ [
215
+ {"intent": "create", "transaction_type": "expense", "details": {"description": "TELLA FOOTBALL CLUB", "amount": 900.00, "currency": "R", "vendor": "TELLA FOOTBALL CLUB"}}
216
+ ]
217
+
218
+ Analyze the provided image and return only the JSON list.
219
  """
220
 
221
  response = vision_model.generate_content([prompt, image_pil])
222
  response_text = response.text
223
 
224
+ json_str = re.search(r'\[.*\]', response_text, re.DOTALL)
225
  if json_str:
226
  return json.loads(json_str.group(0))
227
  else:
228
+ logger.error(f"Vision AI did not return a valid JSON list. Raw response: {response_text}")
229
+ return []
230
 
231
  except Exception as e:
232
  logger.error(f"Error in Vision AI processing: {e}", exc_info=True)
233
+ return []
234
 
235
  def process_image_and_generate_query(image_bytes: bytes, caption: Optional[str]) -> str:
236
  """Master function to process an image and generate a natural language query."""
237
  logger.info("Starting image analysis with Vision AI.")
238
+ vision_json_list = _analyze_image_with_vision(image_bytes)
239
 
240
+ if not vision_json_list:
241
+ return "Error: I couldn't find any actionable transactions in the image."
242
 
243
+ logger.info(f"Vision AI analysis complete. Result: {vision_json_list}")
244
+ return _transpile_vision_json_to_query(vision_json_list, caption)
245
 
246
+ # --- END: VISION PROCESSING FUNCTIONS ---
247
 
248
  class ReportEngine:
249
  def __init__(self, dfs_with_names: List[Tuple[str, pd.DataFrame]], query: str):
 
255
  def _get_time_filter(self, target_df: pd.DataFrame) -> Optional[pd.Series]:
256
  if 'timestamp' not in target_df.columns:
257
  return None
 
258
  today = self.now.date()
259
  if "last month" in self.query:
260
  first_day_current_month = self.now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
 
270
  return target_df['timestamp'].dt.date >= start_of_week
271
  if "today" in self.query:
272
  return target_df['timestamp'].dt.date == today
273
+ return pd.Series(True, index=target_df.index)
274
 
275
+ def generate_report(self, subject_item: Optional[str] = None) -> str:
276
  subject = "sales"
277
+ if "profit" in self.query: subject = "profit"
278
+ elif any(k in self.query for k in ["best day", "busiest day", "sales by day"]): subject = "day_of_week"
279
+ elif "expense" in self.query: subject = "expenses"
280
+ if subject_item: subject = "item_report"
281
+
282
+ if subject == "profit":
283
+ sales_df = self.dfs.get('sales', pd.DataFrame())
284
+ expenses_df = self.dfs.get('expenses', pd.DataFrame())
285
+
286
+ time_filter_sales = self._get_time_filter(sales_df) if not sales_df.empty else None
287
+ time_filter_expenses = self._get_time_filter(expenses_df) if not expenses_df.empty else None
288
 
289
+ filtered_sales = sales_df[time_filter_sales] if time_filter_sales is not None else sales_df
290
+ filtered_expenses = expenses_df[time_filter_expenses] if time_filter_expenses is not None else expenses_df
291
 
292
+ total_revenue = filtered_sales['sale_total'].sum() if not filtered_sales.empty else 0
293
+ total_cogs = filtered_sales['cogs'].sum() if not filtered_sales.empty and 'cogs' in filtered_sales.columns else 0
294
+ total_expenses = filtered_expenses['amount'].sum() if not filtered_expenses.empty else 0
295
+
296
+ gross_profit = total_revenue - total_cogs
297
+ net_profit = gross_profit - total_expenses
298
+
299
+ num_sales = len(filtered_sales)
300
+ total_items_sold = filtered_sales['quantity'].sum() if not filtered_sales.empty else 0
301
+ atv = total_revenue / num_sales if num_sales > 0 else 0
302
+ ipt = total_items_sold / num_sales if num_sales > 0 else 0
303
+ expense_ratio = (total_expenses / total_revenue) * 100 if total_revenue > 0 else 0
304
+
305
+ most_profitable_item = "N/A"
306
+ if not filtered_sales.empty and 'cogs' in filtered_sales.columns:
307
+ filtered_sales['item_profit'] = filtered_sales['sale_total'] - filtered_sales['cogs']
308
+ item_profitability = filtered_sales.groupby('item')['item_profit'].sum()
309
+ if not item_profitability.empty:
310
+ most_profitable_item = item_profitability.idxmax()
311
 
312
+ self.results = {
313
+ "report_subject": "Profitability",
314
+ "total_revenue": f"${total_revenue:.2f}", "total_cogs": f"${total_cogs:.2f}",
315
+ "gross_profit": f"${gross_profit:.2f}", "total_expenses": f"${total_expenses:.2f}",
316
+ "net_profit": f"${net_profit:.2f}", "average_transaction_value": f"${atv:.2f}",
317
+ "items_per_transaction": f"{ipt:.2f}", "expense_to_revenue_ratio": f"{expense_ratio:.2f}%",
318
+ "most_profitable_item": most_profitable_item
319
+ }
320
+
321
+ elif subject == "item_report":
322
+ sales_df = self.dfs.get('sales', pd.DataFrame())
323
+ if sales_df.empty: return json.dumps({"error": f"No sales data found for '{subject_item}'."})
324
+
325
+ item_df = sales_df[sales_df['item'].str.contains(subject_item, case=False, na=False)]
326
+ if item_df.empty: return json.dumps({"error": f"I couldn't find any sales for '{subject_item}'."})
327
+
328
+ time_filter = self._get_time_filter(item_df)
329
+ filtered_df = item_df[time_filter] if time_filter is not None else item_df
330
+ if filtered_df.empty: return json.dumps({"error": f"No data for '{subject_item}' in this period."})
331
+
332
+ units_sold = filtered_df['quantity'].sum()
333
+ total_revenue = filtered_df['sale_total'].sum()
334
+ total_cogs = filtered_df['cogs'].sum() if 'cogs' in filtered_df.columns else 0
335
+ gross_profit = total_revenue - total_cogs
336
+ profit_margin = (gross_profit / total_revenue) * 100 if total_revenue > 0 else 0
337
+ avg_price = total_revenue / units_sold if units_sold > 0 else 0
338
 
339
+ self.results = {
340
+ "report_subject": "Item Report", "item_name": subject_item,
341
+ "units_sold": int(units_sold), "total_revenue": f"${total_revenue:.2f}",
342
+ "total_cogs": f"${total_cogs:.2f}", "gross_profit": f"${gross_profit:.2f}",
343
+ "profit_margin": f"{profit_margin:.2f}%", "average_selling_price": f"${avg_price:.2f}"
344
+ }
345
+
346
+ elif subject == "day_of_week":
347
+ sales_df = self.dfs.get('sales', pd.DataFrame())
348
+ if sales_df.empty or 'day_of_week' not in sales_df.columns: return json.dumps({"error": "No data to analyze by day."})
349
+ time_filter = self._get_time_filter(sales_df)
350
+ filtered_df = sales_df[time_filter] if time_filter is not None else sales_df
351
+ if filtered_df.empty: return json.dumps({"error": "No sales data in this period."})
352
+
353
+ daily_sales = filtered_df.groupby('day_of_week')['sale_total'].sum()
354
+ best_day = daily_sales.idxmax()
355
+ day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
356
+ daily_sales = daily_sales.reindex(day_order).fillna(0)
357
+
358
+ self.results = {
359
+ "report_subject": "Day of Week Analysis",
360
+ "best_day": best_day,
361
+ "daily_sales_breakdown": {day: f"${amount:.2f}" for day, amount in daily_sales.to_dict().items()}
362
+ }
363
+
364
+ elif subject == "sales":
365
+ target_df = self.dfs.get('sales', pd.DataFrame())
366
+ if target_df.empty: return json.dumps({"error": "No sales data."})
367
+ time_filter = self._get_time_filter(target_df)
368
+ target_df = target_df[time_filter] if time_filter is not None else target_df
369
+ if target_df.empty: return json.dumps({"error": "No sales data in this period."})
370
+
371
  total_revenue = target_df['sale_total'].sum()
372
  num_transactions = len(target_df)
373
  item_summary = target_df.groupby('item')['quantity'].sum()
374
  best_selling_item = item_summary.idxmax() if not item_summary.empty else "N/A"
375
  worst_selling_item = item_summary.idxmin() if not item_summary.empty else "N/A"
376
+ self.results = {"report_subject": "Sales", "total_revenue": f"${total_revenue:.2f}", "number_of_sales": num_transactions, "best_selling_item": best_selling_item, "worst_selling_item": worst_selling_item}
377
+
 
 
 
 
 
378
  else: # expenses
379
+ target_df = self.dfs.get('expenses', pd.DataFrame())
380
+ if target_df.empty: return json.dumps({"error": "No expense data."})
381
+ time_filter = self._get_time_filter(target_df)
382
+ target_df = target_df[time_filter] if time_filter is not None else target_df
383
+ if target_df.empty: return json.dumps({"error": "No expense data in this period."})
384
+
385
  total_expenses = target_df['amount'].sum()
386
  num_transactions = len(target_df)
387
  category_summary = target_df.groupby('description')['amount'].sum()
388
  highest_expense_category = category_summary.idxmax() if not category_summary.empty else "N/A"
389
+ self.results = {"report_subject": "Expenses", "total_expenses": f"${total_expenses:.2f}", "number_of_expenses": num_transactions, "highest_expense_category": highest_expense_category}
 
 
 
 
 
390
 
391
  return json.dumps(self.results, indent=2)
392
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
393
  def generateResponse(prompt: str) -> str:
394
  """Generate structured JSON response from user input using Generative AI."""
395
  if not model:
 
749
  df[col] = df[col].fillna('Unknown')
750
  return df
751
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
752
  def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
753
  col_ref = db.collection("users").document(user_phone).collection(collection_name)
754
  if 'transaction_id' in details and details['transaction_id']:
 
855
  logger.error(f"Failed to persist temporary transaction for user {mobile}: {e}", exc_info=True)
856
  return False
857
 
 
 
858
  def format_transaction_response(transactions: Union[List[Dict], Dict, None]) -> str:
859
  if not transactions: return "No transaction data to display."
860
  if isinstance(transactions, dict): transactions = [transactions]
 
866
  title = f"{trans_type}"
867
  if len(transactions) > 1: output_lines.append(f"--- {title} {idx + 1} ---")
868
  else: output_lines.append(f"--- {title} ---")
869
+ key_order = ['transaction_id', 'item', 'service_name', 'name', 'creditor', 'category', 'quantity', 'units_available', 'hours', 'price', 'rate', 'amount', 'cost', 'value', 'customer', 'vendor', 'client', 'date', 'acquisition_date', 'due_date', 'description', 'type']
 
 
 
 
870
  displayed_keys = set()
871
  if 'transaction_id' in trans:
872
  output_lines.append(f"• Transaction ID: {trans['transaction_id']}")