| import numpy as np |
| import pandas as pd |
| import os |
| from datetime import datetime, timedelta |
| from expense_tracker.utils import MongoDBClient |
| from bson import ObjectId |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class SpendingForecaster: |
| def __init__(self, user_id): |
| self.user_id = user_id |
| |
| pass |
|
|
| def fetch_daily_spending(self): |
| db = MongoDBClient.get_client() |
| |
| uid = self.user_id |
| if not isinstance(uid, ObjectId): |
| try: |
| uid = ObjectId(uid) |
| except: |
| return pd.DataFrame() |
|
|
| |
| user = db.users.find_one({'_id': uid}, {'financial_data.expenses': 1}) |
| if not user or 'financial_data' not in user: |
| print("DEBUG: User or financial_data not found in DB.") |
| return pd.DataFrame() |
| |
| expenses = user['financial_data'].get('expenses', []) |
| if not expenses: |
| print("DEBUG: No expenses found in user record.") |
| return pd.DataFrame() |
|
|
| data = [] |
| for e in expenses: |
| |
| date_val = e.get('date') or e.get('Date') |
| amount_val = e.get('amount') or e.get('Amount') |
| |
| if date_val and amount_val is not None: |
| try: |
| data.append({ |
| 'date': date_val, |
| 'amount': float(amount_val) |
| }) |
| except (ValueError, TypeError): |
| continue |
| |
| df = pd.DataFrame(data) |
| if df.empty: |
| return pd.DataFrame() |
|
|
| df['date'] = pd.to_datetime(df['date']) |
| |
| daily = df.groupby(df['date'].dt.date)['amount'].sum().reset_index() |
| daily.columns = ['ds', 'y'] |
| daily['ds'] = pd.to_datetime(daily['ds']) |
| daily = daily.sort_values('ds') |
| |
| |
| if not daily.empty: |
| idx = pd.date_range(daily['ds'].min(), daily['ds'].max()) |
| daily = daily.set_index('ds').reindex(idx, fill_value=0).reset_index() |
| daily.columns = ['ds', 'y'] |
| |
| return daily |
|
|
| def forecast_next_30_days(self): |
| df = self.fetch_daily_spending() |
| if not df.empty: |
| print(f"DEBUG: Date Range: {df['ds'].min()} to {df['ds'].max()}") |
| |
| if df.empty or len(df) < 5: |
| print(f"DEBUG: Insufficient data. Returning None.") |
| return None |
| |
| |
| print("DEBUG: Generating Primary Statistical Forecast (Seasonal Averaging)...") |
| statistical_result = self._generate_statistical_fallback(df) |
| |
| if statistical_result: |
| print(f"✅ Statistical Forecast generated successfully.") |
| return statistical_result |
|
|
| |
| try: |
| print("DEBUG: Falling back to Chronos Bolt Forecasting...") |
| return self._forecast_with_chronos_bolt(df) |
| except Exception as e: |
| print(f"Chronos Bolt Fallback Error: {e}") |
| return None |
|
|
| def _forecast_with_chronos_bolt(self, df): |
| """ |
| Uses Amazon Chronos Bolt (Tiny) for high-precision time-series forecasting. |
| """ |
| try: |
| import torch |
| from chronos import ChronosBoltPipeline |
| |
| |
| context = torch.tensor(df['y'].tail(120).values, dtype=torch.float32) |
| |
| print(f"DEBUG: Loading Chronos Bolt Tiny model...") |
| pipeline = ChronosBoltPipeline.from_pretrained( |
| "amazon/chronos-bolt-tiny", |
| device_map="auto", |
| dtype=torch.float32, |
| ) |
| |
| print(f"DEBUG: Generating 30-day forecast...") |
| |
| forecast = pipeline.predict(context, prediction_length=30) |
| |
| |
| forecast_data = torch.median(forecast, dim=1).values[0].detach().cpu().numpy() |
| |
| |
| std_dev = df['y'].std() if len(df) > 1 else 10 |
| |
| last_date = df['ds'].max() |
| result = [] |
| |
| for i, amount in enumerate(forecast_data): |
| future_date = last_date + timedelta(days=i+1) |
| predicted_amount = max(0, float(amount)) |
| |
| |
| uncertainty_scale = 1.0 + (i * 0.05) |
| low_bound = max(0, predicted_amount - (2.0 * std_dev * uncertainty_scale)) |
| high_bound = predicted_amount + (2.0 * std_dev * uncertainty_scale) |
| |
| result.append({ |
| 'date': future_date.strftime('%Y-%m-%d'), |
| 'amount': round(predicted_amount, 2), |
| 'low': round(float(low_bound), 2), |
| 'high': round(float(high_bound), 2) |
| }) |
| |
| return result |
| except Exception as e: |
| print(f"ERROR in Chronos Bolt Implementation: {e}") |
| return None |
|
|
| def _generate_statistical_fallback(self, df): |
| """ |
| Highest accuracy statistical fallback using Triple-Weighted Seasonal Averaging. |
| 1. Linear Trend (Regression) |
| 2. Day-of-Week Seasonality |
| 3. Recency-Weighted Mean |
| """ |
| try: |
| n = len(df) |
| last_date = df['ds'].max() |
| |
| |
| |
| weights = np.ones(n) |
| if n > 30: |
| weights[-30:] = 2.0 |
| weighted_mean = np.average(df['y'], weights=weights) |
| |
| |
| df['weekday'] = df['ds'].dt.weekday |
| seasonal_map = df.groupby('weekday')['y'].mean().to_dict() |
| |
| |
| overall_avg = df['y'].mean() if n > 0 else 1 |
| |
| print(f"DEBUG: WeightedMean={weighted_mean:.2f}, OverallAvg={overall_avg:.2f}") |
| |
| for k in seasonal_map: |
| factor = seasonal_map[k] / overall_avg if overall_avg > 0 else 1.0 |
| |
| seasonal_map[k] = np.clip(factor, 0.2, 5.0) |
|
|
| |
| if n > 14: |
| x = np.arange(n) |
| y = df['y'].values |
| slope = np.polyfit(x, y, 1)[0] |
| |
| slope = np.clip(slope, -weighted_mean*0.01, weighted_mean*0.01) |
| else: |
| slope = 0 |
| |
| print(f"DEBUG: Slope={slope:.4f}") |
| |
| std_dev = df['y'].std() if n > 1 else (weighted_mean * 0.2) |
| |
| result = [] |
| total_predicted = 0 |
| |
| for i in range(1, 31): |
| future_date = last_date + timedelta(days=i) |
| wd = future_date.weekday() |
| |
| |
| base_pred = weighted_mean + (slope * i) |
| seasonal_factor = seasonal_map.get(wd, 1.0) |
| |
| predicted_amount = max(0, base_pred * seasonal_factor) |
| total_predicted += predicted_amount |
| |
| |
| width = 1.96 * std_dev * (1 + (i * 0.05)) |
| |
| result.append({ |
| 'date': future_date.strftime('%Y-%m-%d'), |
| 'amount': round(float(predicted_amount), 2), |
| 'low': round(float(max(0, predicted_amount - width)), 2), |
| 'high': round(float(predicted_amount + width), 2) |
| }) |
| |
| print(f"DEBUG: 30-Day Forecast Total: {total_predicted:.2f}") |
| return result |
| except Exception as e: |
| print(f"Critical Statistical Fallback Error: {e}") |
| import traceback |
| traceback.print_exc() |
| return None |
|
|
| def get_forecast(user_id): |
| forecaster = SpendingForecaster(user_id) |
| return forecaster.forecast_next_30_days() |
|
|