import numpy as np import pandas as pd import os from datetime import datetime, timedelta from expense_tracker.utils import MongoDBClient from bson import ObjectId import logging logger = logging.getLogger(__name__) class SpendingForecaster: def __init__(self, user_id): self.user_id = user_id # Local models disabled in favor of API-only mode pass def fetch_daily_spending(self): db = MongoDBClient.get_client() uid = self.user_id if not isinstance(uid, ObjectId): try: uid = ObjectId(uid) except: return pd.DataFrame() # Reverted: Use ONLY the nested user data as requested by USER user = db.users.find_one({'_id': uid}, {'financial_data.expenses': 1}) if not user or 'financial_data' not in user: print("DEBUG: User or financial_data not found in DB.") return pd.DataFrame() expenses = user['financial_data'].get('expenses', []) if not expenses: print("DEBUG: No expenses found in user record.") return pd.DataFrame() data = [] for e in expenses: # Handle both CamelCase and lowercase keys date_val = e.get('date') or e.get('Date') amount_val = e.get('amount') or e.get('Amount') if date_val and amount_val is not None: try: data.append({ 'date': date_val, 'amount': float(amount_val) }) except (ValueError, TypeError): continue df = pd.DataFrame(data) if df.empty: return pd.DataFrame() df['date'] = pd.to_datetime(df['date']) # Aggregate by date to get daily totals daily = df.groupby(df['date'].dt.date)['amount'].sum().reset_index() daily.columns = ['ds', 'y'] daily['ds'] = pd.to_datetime(daily['ds']) daily = daily.sort_values('ds') # Fill missing dates with 0 to ensure continuity for seasonality detection if not daily.empty: idx = pd.date_range(daily['ds'].min(), daily['ds'].max()) daily = daily.set_index('ds').reindex(idx, fill_value=0).reset_index() daily.columns = ['ds', 'y'] return daily def forecast_next_30_days(self): df = self.fetch_daily_spending() if not df.empty: print(f"DEBUG: Date Range: {df['ds'].min()} to {df['ds'].max()}") if df.empty or len(df) < 5: print(f"DEBUG: Insufficient data. Returning None.") return None # 1. PRIMARY: Use Robust Statistical Methods (No AI API dependency, Fast & Reliable) print("DEBUG: Generating Primary Statistical Forecast (Seasonal Averaging)...") statistical_result = self._generate_statistical_fallback(df) if statistical_result: print(f"✅ Statistical Forecast generated successfully.") return statistical_result # 2. FALLBACK: Attempt Chronos Bolt if statistical method fails (Rare) try: print("DEBUG: Falling back to Chronos Bolt Forecasting...") return self._forecast_with_chronos_bolt(df) except Exception as e: print(f"Chronos Bolt Fallback Error: {e}") return None def _forecast_with_chronos_bolt(self, df): """ Uses Amazon Chronos Bolt (Tiny) for high-precision time-series forecasting. """ try: import torch from chronos import ChronosBoltPipeline # Prepare context (last 120 days of spending history) context = torch.tensor(df['y'].tail(120).values, dtype=torch.float32) print(f"DEBUG: Loading Chronos Bolt Tiny model...") pipeline = ChronosBoltPipeline.from_pretrained( "amazon/chronos-bolt-tiny", device_map="auto", dtype=torch.float32, ) print(f"DEBUG: Generating 30-day forecast...") # Predict forecast = pipeline.predict(context, prediction_length=30) # Extract median prediction forecast_data = torch.median(forecast, dim=1).values[0].detach().cpu().numpy() # Estimate bounds based on historical volatility std_dev = df['y'].std() if len(df) > 1 else 10 last_date = df['ds'].max() result = [] for i, amount in enumerate(forecast_data): future_date = last_date + timedelta(days=i+1) predicted_amount = max(0, float(amount)) # Uncertainty scale increases with time uncertainty_scale = 1.0 + (i * 0.05) low_bound = max(0, predicted_amount - (2.0 * std_dev * uncertainty_scale)) high_bound = predicted_amount + (2.0 * std_dev * uncertainty_scale) result.append({ 'date': future_date.strftime('%Y-%m-%d'), 'amount': round(predicted_amount, 2), 'low': round(float(low_bound), 2), 'high': round(float(high_bound), 2) }) return result except Exception as e: print(f"ERROR in Chronos Bolt Implementation: {e}") return None def _generate_statistical_fallback(self, df): """ Highest accuracy statistical fallback using Triple-Weighted Seasonal Averaging. 1. Linear Trend (Regression) 2. Day-of-Week Seasonality 3. Recency-Weighted Mean """ try: n = len(df) last_date = df['ds'].max() # 1. Calculate Recency-Weighted Mean (last 30 days carry more weight) # Use a longer window for better stability in monthly projections weights = np.ones(n) if n > 30: weights[-30:] = 2.0 weighted_mean = np.average(df['y'], weights=weights) # 2. Daily Seasonality (Avg spend by day of week) df['weekday'] = df['ds'].dt.weekday seasonal_map = df.groupby('weekday')['y'].mean().to_dict() # Normalize map to ensure it doesn't inflate total spend overall_avg = df['y'].mean() if n > 0 else 1 print(f"DEBUG: WeightedMean={weighted_mean:.2f}, OverallAvg={overall_avg:.2f}") for k in seasonal_map: factor = seasonal_map[k] / overall_avg if overall_avg > 0 else 1.0 # CAP factor to prevent extreme outliers (max 5x regular spend) seasonal_map[k] = np.clip(factor, 0.2, 5.0) # 3. Linear Trend (Slope) if n > 14: x = np.arange(n) y = df['y'].values slope = np.polyfit(x, y, 1)[0] # Cap slope to prevent runaway projections (max 1% change per day) slope = np.clip(slope, -weighted_mean*0.01, weighted_mean*0.01) else: slope = 0 print(f"DEBUG: Slope={slope:.4f}") std_dev = df['y'].std() if n > 1 else (weighted_mean * 0.2) result = [] total_predicted = 0 for i in range(1, 31): future_date = last_date + timedelta(days=i) wd = future_date.weekday() # Corrected formula for stable projection base_pred = weighted_mean + (slope * i) seasonal_factor = seasonal_map.get(wd, 1.0) predicted_amount = max(0, base_pred * seasonal_factor) total_predicted += predicted_amount # Confidence intervals widen over time width = 1.96 * std_dev * (1 + (i * 0.05)) result.append({ 'date': future_date.strftime('%Y-%m-%d'), 'amount': round(float(predicted_amount), 2), 'low': round(float(max(0, predicted_amount - width)), 2), 'high': round(float(predicted_amount + width), 2) }) print(f"DEBUG: 30-Day Forecast Total: {total_predicted:.2f}") return result except Exception as e: print(f"Critical Statistical Fallback Error: {e}") import traceback traceback.print_exc() return None def get_forecast(user_id): forecaster = SpendingForecaster(user_id) return forecaster.forecast_next_30_days()