File size: 8,965 Bytes
24e6f5b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | import numpy as np
import pandas as pd
import os
from datetime import datetime, timedelta
from expense_tracker.utils import MongoDBClient
from bson import ObjectId
import logging
logger = logging.getLogger(__name__)
class SpendingForecaster:
def __init__(self, user_id):
self.user_id = user_id
# Local models disabled in favor of API-only mode
pass
def fetch_daily_spending(self):
db = MongoDBClient.get_client()
uid = self.user_id
if not isinstance(uid, ObjectId):
try:
uid = ObjectId(uid)
except:
return pd.DataFrame()
# Reverted: Use ONLY the nested user data as requested by USER
user = db.users.find_one({'_id': uid}, {'financial_data.expenses': 1})
if not user or 'financial_data' not in user:
print("DEBUG: User or financial_data not found in DB.")
return pd.DataFrame()
expenses = user['financial_data'].get('expenses', [])
if not expenses:
print("DEBUG: No expenses found in user record.")
return pd.DataFrame()
data = []
for e in expenses:
# Handle both CamelCase and lowercase keys
date_val = e.get('date') or e.get('Date')
amount_val = e.get('amount') or e.get('Amount')
if date_val and amount_val is not None:
try:
data.append({
'date': date_val,
'amount': float(amount_val)
})
except (ValueError, TypeError):
continue
df = pd.DataFrame(data)
if df.empty:
return pd.DataFrame()
df['date'] = pd.to_datetime(df['date'])
# Aggregate by date to get daily totals
daily = df.groupby(df['date'].dt.date)['amount'].sum().reset_index()
daily.columns = ['ds', 'y']
daily['ds'] = pd.to_datetime(daily['ds'])
daily = daily.sort_values('ds')
# Fill missing dates with 0 to ensure continuity for seasonality detection
if not daily.empty:
idx = pd.date_range(daily['ds'].min(), daily['ds'].max())
daily = daily.set_index('ds').reindex(idx, fill_value=0).reset_index()
daily.columns = ['ds', 'y']
return daily
def forecast_next_30_days(self):
df = self.fetch_daily_spending()
if not df.empty:
print(f"DEBUG: Date Range: {df['ds'].min()} to {df['ds'].max()}")
if df.empty or len(df) < 5:
print(f"DEBUG: Insufficient data. Returning None.")
return None
# 1. PRIMARY: Use Robust Statistical Methods (No AI API dependency, Fast & Reliable)
print("DEBUG: Generating Primary Statistical Forecast (Seasonal Averaging)...")
statistical_result = self._generate_statistical_fallback(df)
if statistical_result:
print(f"✅ Statistical Forecast generated successfully.")
return statistical_result
# 2. FALLBACK: Attempt Chronos Bolt if statistical method fails (Rare)
try:
print("DEBUG: Falling back to Chronos Bolt Forecasting...")
return self._forecast_with_chronos_bolt(df)
except Exception as e:
print(f"Chronos Bolt Fallback Error: {e}")
return None
def _forecast_with_chronos_bolt(self, df):
"""
Uses Amazon Chronos Bolt (Tiny) for high-precision time-series forecasting.
"""
try:
import torch
from chronos import ChronosBoltPipeline
# Prepare context (last 120 days of spending history)
context = torch.tensor(df['y'].tail(120).values, dtype=torch.float32)
print(f"DEBUG: Loading Chronos Bolt Tiny model...")
pipeline = ChronosBoltPipeline.from_pretrained(
"amazon/chronos-bolt-tiny",
device_map="auto",
dtype=torch.float32,
)
print(f"DEBUG: Generating 30-day forecast...")
# Predict
forecast = pipeline.predict(context, prediction_length=30)
# Extract median prediction
forecast_data = torch.median(forecast, dim=1).values[0].detach().cpu().numpy()
# Estimate bounds based on historical volatility
std_dev = df['y'].std() if len(df) > 1 else 10
last_date = df['ds'].max()
result = []
for i, amount in enumerate(forecast_data):
future_date = last_date + timedelta(days=i+1)
predicted_amount = max(0, float(amount))
# Uncertainty scale increases with time
uncertainty_scale = 1.0 + (i * 0.05)
low_bound = max(0, predicted_amount - (2.0 * std_dev * uncertainty_scale))
high_bound = predicted_amount + (2.0 * std_dev * uncertainty_scale)
result.append({
'date': future_date.strftime('%Y-%m-%d'),
'amount': round(predicted_amount, 2),
'low': round(float(low_bound), 2),
'high': round(float(high_bound), 2)
})
return result
except Exception as e:
print(f"ERROR in Chronos Bolt Implementation: {e}")
return None
def _generate_statistical_fallback(self, df):
"""
Highest accuracy statistical fallback using Triple-Weighted Seasonal Averaging.
1. Linear Trend (Regression)
2. Day-of-Week Seasonality
3. Recency-Weighted Mean
"""
try:
n = len(df)
last_date = df['ds'].max()
# 1. Calculate Recency-Weighted Mean (last 30 days carry more weight)
# Use a longer window for better stability in monthly projections
weights = np.ones(n)
if n > 30:
weights[-30:] = 2.0
weighted_mean = np.average(df['y'], weights=weights)
# 2. Daily Seasonality (Avg spend by day of week)
df['weekday'] = df['ds'].dt.weekday
seasonal_map = df.groupby('weekday')['y'].mean().to_dict()
# Normalize map to ensure it doesn't inflate total spend
overall_avg = df['y'].mean() if n > 0 else 1
print(f"DEBUG: WeightedMean={weighted_mean:.2f}, OverallAvg={overall_avg:.2f}")
for k in seasonal_map:
factor = seasonal_map[k] / overall_avg if overall_avg > 0 else 1.0
# CAP factor to prevent extreme outliers (max 5x regular spend)
seasonal_map[k] = np.clip(factor, 0.2, 5.0)
# 3. Linear Trend (Slope)
if n > 14:
x = np.arange(n)
y = df['y'].values
slope = np.polyfit(x, y, 1)[0]
# Cap slope to prevent runaway projections (max 1% change per day)
slope = np.clip(slope, -weighted_mean*0.01, weighted_mean*0.01)
else:
slope = 0
print(f"DEBUG: Slope={slope:.4f}")
std_dev = df['y'].std() if n > 1 else (weighted_mean * 0.2)
result = []
total_predicted = 0
for i in range(1, 31):
future_date = last_date + timedelta(days=i)
wd = future_date.weekday()
# Corrected formula for stable projection
base_pred = weighted_mean + (slope * i)
seasonal_factor = seasonal_map.get(wd, 1.0)
predicted_amount = max(0, base_pred * seasonal_factor)
total_predicted += predicted_amount
# Confidence intervals widen over time
width = 1.96 * std_dev * (1 + (i * 0.05))
result.append({
'date': future_date.strftime('%Y-%m-%d'),
'amount': round(float(predicted_amount), 2),
'low': round(float(max(0, predicted_amount - width)), 2),
'high': round(float(predicted_amount + width), 2)
})
print(f"DEBUG: 30-Day Forecast Total: {total_predicted:.2f}")
return result
except Exception as e:
print(f"Critical Statistical Fallback Error: {e}")
import traceback
traceback.print_exc()
return None
def get_forecast(user_id):
forecaster = SpendingForecaster(user_id)
return forecaster.forecast_next_30_days()
|