rairo commited on
Commit
6c80eb1
·
verified ·
1 Parent(s): bac8902

Update utility.py

Browse files
Files changed (1) hide show
  1. utility.py +55 -57
utility.py CHANGED
@@ -367,34 +367,34 @@ class ReportEngine:
367
  }
368
  return json.dumps(self.results, indent=2)
369
 
370
- def generate_forecast_data(self) -> str:
371
- sales_df = self.dfs.get('sales')
372
- if sales_df is None or sales_df.empty:
373
- return json.dumps({"error": "Not enough sales data to generate a forecast."})
374
-
375
- sales_df_copy = sales_df.copy()
376
- sales_df_copy.set_index('timestamp', inplace=True)
377
- weekly_sales = sales_df_copy['sale_total'].resample('W').sum()
378
-
379
- if len(weekly_sales) < 2:
380
- return json.dumps({"error": "I need at least two weeks of sales data to make a forecast."})
381
 
382
- last_week_sales = weekly_sales.iloc[-1]
383
- previous_week_sales = weekly_sales.iloc[-2] if len(weekly_sales) > 1 else 0
384
-
385
- growth_rate = 0
386
- if previous_week_sales > 0:
387
- growth_rate = ((last_week_sales - previous_week_sales) / previous_week_sales) * 100
388
 
389
- historical_avg = weekly_sales.head(-1).mean()
 
 
 
 
 
 
 
 
 
 
390
 
391
- self.results = {
392
- "last_period_sales": f"${last_week_sales:.2f}",
393
- "previous_period_sales": f"${previous_week_sales:.2f}",
394
- "period_over_period_growth": f"{growth_rate:.2f}%",
395
- "historical_average": f"${historical_avg:.2f}"
396
- }
397
- return json.dumps(self.results, indent=2)
398
 
399
  def generateResponse(prompt: str) -> str:
400
  """Generate structured JSON response from user input using Generative AI."""
@@ -743,6 +743,18 @@ def create_liability(user_phone: str, transaction_data: List[Dict]) -> tuple[boo
743
  logger.error(f"Liability batch commit failed for user {user_phone}: {e}", exc_info=True)
744
  return False, f"Failed to record liabilities. An error occurred: {e}"
745
 
 
 
 
 
 
 
 
 
 
 
 
 
746
  def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]:
747
  """Fetches all user data, splits/validates DataFrames, and engineers features."""
748
  all_dfs_with_names = []
@@ -785,45 +797,24 @@ def _get_relative_date_context() -> str:
785
  """
786
  today = datetime.now(timezone.utc)
787
 
788
- # Helper to format dates
789
  def fmt(d):
790
  return d.strftime('%Y-%m-%d')
791
 
792
- # Basic dates
793
  yesterday = today - timedelta(days=1)
794
-
795
- # Week calculations
796
  start_of_this_week = today - timedelta(days=today.weekday())
797
  end_of_this_week = start_of_this_week + timedelta(days=6)
798
  start_of_last_week = start_of_this_week - timedelta(days=7)
799
  end_of_last_week = start_of_last_week + timedelta(days=6)
800
 
801
- # Last weekday calculations
802
- last_monday = start_of_this_week if today.weekday() != 0 else start_of_last_week
803
- if last_monday >= today:
804
- last_monday -= timedelta(days=7)
805
- last_tuesday = last_monday + timedelta(days=1)
806
- last_wednesday = last_monday + timedelta(days=2)
807
- last_thursday = last_monday + timedelta(days=3)
808
- last_friday = last_monday + timedelta(days=4)
809
- last_saturday = last_monday + timedelta(days=5)
810
- last_sunday = last_monday + timedelta(days=6)
811
-
812
  context = [
813
  f"Here are some pre-calculated dates to help you understand the user's request:",
814
  f"- Today is: {fmt(today)}",
815
  f"- Yesterday was: {fmt(yesterday)}",
816
  f"- The start of this week was: {fmt(start_of_this_week)}",
817
- f"- The end of this week is: {fmt(end_of_this_week)}",
818
  f"- The start of last week was: {fmt(start_of_last_week)}",
819
- f"- The end of last week was: {fmt(end_of_last_week)}",
820
  f"- Last Monday was on: {fmt(last_monday)}",
821
- f"- Last Tuesday was on: {fmt(last_tuesday)}",
822
- f"- Last Wednesday was on: {fmt(last_wednesday)}",
823
- f"- Last Thursday was on: {fmt(last_thursday)}",
824
- f"- Last Friday was on: {fmt(last_friday)}",
825
- f"- Last Saturday was on: {fmt(last_saturday)}",
826
- f"- Last Sunday was on: {fmt(last_sunday)}"
827
  ]
828
 
829
  return "\n".join(context)
@@ -859,8 +850,8 @@ def read_datalake(user_phone: str, query: str) -> str:
859
 
860
  # --- Tier 1.5: Specific KPI Report Router ---
861
  item_report_match = re.search(r"(?:report on|how did) ([\w\s]+)", query_lower)
 
862
 
863
- # Check for specific KPI queries first
864
  if "profit" in query_lower:
865
  logger.info(f"Handling '{query}' with the Profit Report Path.")
866
  report_json = engine.generate_profit_report()
@@ -871,8 +862,6 @@ def read_datalake(user_phone: str, query: str) -> str:
871
  item_name = item_report_match.group(1).strip()
872
  logger.info(f"Handling '{query}' with the Item Report Path for item: '{item_name}'.")
873
  report_json = engine.generate_item_report(item_name)
874
- else:
875
- report_json = None
876
 
877
  if report_json:
878
  report_data = json.loads(report_json)
@@ -895,21 +884,29 @@ def read_datalake(user_phone: str, query: str) -> str:
895
  return response.content
896
 
897
  # --- Tier 2: General Intelligent Router ---
898
- predictive_keywords = ["expect", "forecast", "predict", "next month", "next week"]
899
  historical_report_keywords = ["report", "summary", "performance", "how did i do", "overview", "month", "year", "week", "today"]
900
 
901
  if any(keyword in query_lower for keyword in predictive_keywords):
902
  logger.info(f"Handling '{query}' with the Forecasting Path.")
903
  forecast_json = engine.generate_forecast_data()
904
- # ... (forecast synthesis logic)
905
- return "Forecast data"
 
 
 
 
906
 
907
  elif any(keyword in query_lower for keyword in historical_report_keywords):
908
  logger.info(f"Handling '{query}' with the General Reporting Path (Sales/Expense).")
909
  report_json = engine.generate_report()
910
- # ... (general report synthesis logic)
911
- return "General report"
912
 
 
 
 
 
913
  else:
914
  # --- Path C: Fortified PandasAI for Q&A and Plotting ---
915
  logger.info(f"Handling '{query}' with the Fortified PandasAI Path.")
@@ -950,11 +947,12 @@ def read_datalake(user_phone: str, query: str) -> str:
950
 
951
  except (NoCodeFoundError, MaliciousQueryError) as e:
952
  logger.error(f"PandasAI failed for query '{query}': {e}")
953
- return f"Unfortunately, I was not able to answer your question, because of the following error:\n\n{e}"
954
  except Exception as e:
955
  logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
956
  return "Sorry, I encountered an error while analyzing your data."
957
 
 
958
  def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
959
  col_ref = db.collection("users").document(user_phone).collection(collection_name)
960
  if 'transaction_id' in details and details['transaction_id']:
 
367
  }
368
  return json.dumps(self.results, indent=2)
369
 
370
+ def generate_forecast_data(self) -> str:
371
+ sales_df = self.dfs.get('sales')
372
+ if sales_df is None or sales_df.empty:
373
+ return json.dumps({"error": "Not enough sales data to generate a forecast."})
 
 
 
 
 
 
 
374
 
375
+ sales_df_copy = sales_df.copy()
376
+ sales_df_copy.set_index('timestamp', inplace=True)
377
+ weekly_sales = sales_df_copy['sale_total'].resample('W').sum()
 
 
 
378
 
379
+ if len(weekly_sales) < 2:
380
+ return json.dumps({"error": "I need at least two weeks of sales data to make a forecast."})
381
+
382
+ last_week_sales = weekly_sales.iloc[-1]
383
+ previous_week_sales = weekly_sales.iloc[-2] if len(weekly_sales) > 1 else 0
384
+
385
+ growth_rate = 0
386
+ if previous_week_sales > 0:
387
+ growth_rate = ((last_week_sales - previous_week_sales) / previous_week_sales) * 100
388
+
389
+ historical_avg = weekly_sales.head(-1).mean()
390
 
391
+ self.results = {
392
+ "last_period_sales": f"${last_week_sales:.2f}",
393
+ "previous_period_sales": f"${previous_week_sales:.2f}",
394
+ "period_over_period_growth": f"{growth_rate:.2f}%",
395
+ "historical_average": f"${historical_avg:.2f}"
396
+ }
397
+ return json.dumps(self.results, indent=2)
398
 
399
  def generateResponse(prompt: str) -> str:
400
  """Generate structured JSON response from user input using Generative AI."""
 
743
  logger.error(f"Liability batch commit failed for user {user_phone}: {e}", exc_info=True)
744
  return False, f"Failed to record liabilities. An error occurred: {e}"
745
 
746
+ def _validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
747
+ """Proactively cleans and validates a DataFrame to ensure data integrity."""
748
+ if df.empty: return df
749
+ for col in ['timestamp', 'created_at', 'last_updated', 'acquisition_date', 'due_date']:
750
+ if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce', utc=True)
751
+ numeric_cols = ['price', 'unit_price', 'quantity', 'amount', 'value', 'cost', 'hours', 'units_available']
752
+ for col in numeric_cols:
753
+ if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
754
+ for col in df.select_dtypes(include=['object']).columns:
755
+ df[col] = df[col].fillna('Unknown')
756
+ return df
757
+
758
  def _fetch_all_collections_as_dfs(user_phone: str) -> List[Tuple[str, pd.DataFrame]]:
759
  """Fetches all user data, splits/validates DataFrames, and engineers features."""
760
  all_dfs_with_names = []
 
797
  """
798
  today = datetime.now(timezone.utc)
799
 
 
800
  def fmt(d):
801
  return d.strftime('%Y-%m-%d')
802
 
 
803
  yesterday = today - timedelta(days=1)
 
 
804
  start_of_this_week = today - timedelta(days=today.weekday())
805
  end_of_this_week = start_of_this_week + timedelta(days=6)
806
  start_of_last_week = start_of_this_week - timedelta(days=7)
807
  end_of_last_week = start_of_last_week + timedelta(days=6)
808
 
809
+ last_monday = start_of_this_week - timedelta(days=7)
810
+
 
 
 
 
 
 
 
 
 
811
  context = [
812
  f"Here are some pre-calculated dates to help you understand the user's request:",
813
  f"- Today is: {fmt(today)}",
814
  f"- Yesterday was: {fmt(yesterday)}",
815
  f"- The start of this week was: {fmt(start_of_this_week)}",
 
816
  f"- The start of last week was: {fmt(start_of_last_week)}",
 
817
  f"- Last Monday was on: {fmt(last_monday)}",
 
 
 
 
 
 
818
  ]
819
 
820
  return "\n".join(context)
 
850
 
851
  # --- Tier 1.5: Specific KPI Report Router ---
852
  item_report_match = re.search(r"(?:report on|how did) ([\w\s]+)", query_lower)
853
+ report_json = None
854
 
 
855
  if "profit" in query_lower:
856
  logger.info(f"Handling '{query}' with the Profit Report Path.")
857
  report_json = engine.generate_profit_report()
 
862
  item_name = item_report_match.group(1).strip()
863
  logger.info(f"Handling '{query}' with the Item Report Path for item: '{item_name}'.")
864
  report_json = engine.generate_item_report(item_name)
 
 
865
 
866
  if report_json:
867
  report_data = json.loads(report_json)
 
884
  return response.content
885
 
886
  # --- Tier 2: General Intelligent Router ---
887
+ predictive_keywords = ["expect", "forecast", "predict"]
888
  historical_report_keywords = ["report", "summary", "performance", "how did i do", "overview", "month", "year", "week", "today"]
889
 
890
  if any(keyword in query_lower for keyword in predictive_keywords):
891
  logger.info(f"Handling '{query}' with the Forecasting Path.")
892
  forecast_json = engine.generate_forecast_data()
893
+ forecast_data = json.loads(forecast_json)
894
+ if "error" in forecast_data: return forecast_data["error"]
895
+
896
+ synthesis_prompt = f"You are a business analyst... Based on the following data, provide a friendly sales forecast: {forecast_json}"
897
+ response = llm.invoke(synthesis_prompt)
898
+ return response.content
899
 
900
  elif any(keyword in query_lower for keyword in historical_report_keywords):
901
  logger.info(f"Handling '{query}' with the General Reporting Path (Sales/Expense).")
902
  report_json = engine.generate_report()
903
+ report_data = json.loads(report_json)
904
+ if "error" in report_data: return report_data["error"]
905
 
906
+ synthesis_prompt = f"You are a helpful business assistant... Based on the summary, create a report. For sales, suggest insights. Data: {report_json}"
907
+ response = llm.invoke(synthesis_prompt)
908
+ return response.content
909
+
910
  else:
911
  # --- Path C: Fortified PandasAI for Q&A and Plotting ---
912
  logger.info(f"Handling '{query}' with the Fortified PandasAI Path.")
 
947
 
948
  except (NoCodeFoundError, MaliciousQueryError) as e:
949
  logger.error(f"PandasAI failed for query '{query}': {e}")
950
+ return f"Unfortunately, I was not able to answer your question: {e}"
951
  except Exception as e:
952
  logger.error(f"Data query failed for user {user_phone}, query '{query}': {e}", exc_info=True)
953
  return "Sorry, I encountered an error while analyzing your data."
954
 
955
+
956
  def _find_document_by_details(user_phone: str, collection_name: str, details: Dict) -> Optional[Any]:
957
  col_ref = db.collection("users").document(user_phone).collection(collection_name)
958
  if 'transaction_id' in details and details['transaction_id']: