FinMK / backend /analytics /forecast.py
Kumar
Refactor: Exclude PDF and CSV files from Git to fix HF push error
24e6f5b
import numpy as np
import pandas as pd
import os
from datetime import datetime, timedelta
from expense_tracker.utils import MongoDBClient
from bson import ObjectId
import logging
logger = logging.getLogger(__name__)
class SpendingForecaster:
def __init__(self, user_id):
self.user_id = user_id
# Local models disabled in favor of API-only mode
pass
def fetch_daily_spending(self):
db = MongoDBClient.get_client()
uid = self.user_id
if not isinstance(uid, ObjectId):
try:
uid = ObjectId(uid)
except:
return pd.DataFrame()
# Reverted: Use ONLY the nested user data as requested by USER
user = db.users.find_one({'_id': uid}, {'financial_data.expenses': 1})
if not user or 'financial_data' not in user:
print("DEBUG: User or financial_data not found in DB.")
return pd.DataFrame()
expenses = user['financial_data'].get('expenses', [])
if not expenses:
print("DEBUG: No expenses found in user record.")
return pd.DataFrame()
data = []
for e in expenses:
# Handle both CamelCase and lowercase keys
date_val = e.get('date') or e.get('Date')
amount_val = e.get('amount') or e.get('Amount')
if date_val and amount_val is not None:
try:
data.append({
'date': date_val,
'amount': float(amount_val)
})
except (ValueError, TypeError):
continue
df = pd.DataFrame(data)
if df.empty:
return pd.DataFrame()
df['date'] = pd.to_datetime(df['date'])
# Aggregate by date to get daily totals
daily = df.groupby(df['date'].dt.date)['amount'].sum().reset_index()
daily.columns = ['ds', 'y']
daily['ds'] = pd.to_datetime(daily['ds'])
daily = daily.sort_values('ds')
# Fill missing dates with 0 to ensure continuity for seasonality detection
if not daily.empty:
idx = pd.date_range(daily['ds'].min(), daily['ds'].max())
daily = daily.set_index('ds').reindex(idx, fill_value=0).reset_index()
daily.columns = ['ds', 'y']
return daily
def forecast_next_30_days(self):
df = self.fetch_daily_spending()
if not df.empty:
print(f"DEBUG: Date Range: {df['ds'].min()} to {df['ds'].max()}")
if df.empty or len(df) < 5:
print(f"DEBUG: Insufficient data. Returning None.")
return None
# 1. PRIMARY: Use Robust Statistical Methods (No AI API dependency, Fast & Reliable)
print("DEBUG: Generating Primary Statistical Forecast (Seasonal Averaging)...")
statistical_result = self._generate_statistical_fallback(df)
if statistical_result:
print(f"✅ Statistical Forecast generated successfully.")
return statistical_result
# 2. FALLBACK: Attempt Chronos Bolt if statistical method fails (Rare)
try:
print("DEBUG: Falling back to Chronos Bolt Forecasting...")
return self._forecast_with_chronos_bolt(df)
except Exception as e:
print(f"Chronos Bolt Fallback Error: {e}")
return None
def _forecast_with_chronos_bolt(self, df):
"""
Uses Amazon Chronos Bolt (Tiny) for high-precision time-series forecasting.
"""
try:
import torch
from chronos import ChronosBoltPipeline
# Prepare context (last 120 days of spending history)
context = torch.tensor(df['y'].tail(120).values, dtype=torch.float32)
print(f"DEBUG: Loading Chronos Bolt Tiny model...")
pipeline = ChronosBoltPipeline.from_pretrained(
"amazon/chronos-bolt-tiny",
device_map="auto",
dtype=torch.float32,
)
print(f"DEBUG: Generating 30-day forecast...")
# Predict
forecast = pipeline.predict(context, prediction_length=30)
# Extract median prediction
forecast_data = torch.median(forecast, dim=1).values[0].detach().cpu().numpy()
# Estimate bounds based on historical volatility
std_dev = df['y'].std() if len(df) > 1 else 10
last_date = df['ds'].max()
result = []
for i, amount in enumerate(forecast_data):
future_date = last_date + timedelta(days=i+1)
predicted_amount = max(0, float(amount))
# Uncertainty scale increases with time
uncertainty_scale = 1.0 + (i * 0.05)
low_bound = max(0, predicted_amount - (2.0 * std_dev * uncertainty_scale))
high_bound = predicted_amount + (2.0 * std_dev * uncertainty_scale)
result.append({
'date': future_date.strftime('%Y-%m-%d'),
'amount': round(predicted_amount, 2),
'low': round(float(low_bound), 2),
'high': round(float(high_bound), 2)
})
return result
except Exception as e:
print(f"ERROR in Chronos Bolt Implementation: {e}")
return None
def _generate_statistical_fallback(self, df):
"""
Highest accuracy statistical fallback using Triple-Weighted Seasonal Averaging.
1. Linear Trend (Regression)
2. Day-of-Week Seasonality
3. Recency-Weighted Mean
"""
try:
n = len(df)
last_date = df['ds'].max()
# 1. Calculate Recency-Weighted Mean (last 30 days carry more weight)
# Use a longer window for better stability in monthly projections
weights = np.ones(n)
if n > 30:
weights[-30:] = 2.0
weighted_mean = np.average(df['y'], weights=weights)
# 2. Daily Seasonality (Avg spend by day of week)
df['weekday'] = df['ds'].dt.weekday
seasonal_map = df.groupby('weekday')['y'].mean().to_dict()
# Normalize map to ensure it doesn't inflate total spend
overall_avg = df['y'].mean() if n > 0 else 1
print(f"DEBUG: WeightedMean={weighted_mean:.2f}, OverallAvg={overall_avg:.2f}")
for k in seasonal_map:
factor = seasonal_map[k] / overall_avg if overall_avg > 0 else 1.0
# CAP factor to prevent extreme outliers (max 5x regular spend)
seasonal_map[k] = np.clip(factor, 0.2, 5.0)
# 3. Linear Trend (Slope)
if n > 14:
x = np.arange(n)
y = df['y'].values
slope = np.polyfit(x, y, 1)[0]
# Cap slope to prevent runaway projections (max 1% change per day)
slope = np.clip(slope, -weighted_mean*0.01, weighted_mean*0.01)
else:
slope = 0
print(f"DEBUG: Slope={slope:.4f}")
std_dev = df['y'].std() if n > 1 else (weighted_mean * 0.2)
result = []
total_predicted = 0
for i in range(1, 31):
future_date = last_date + timedelta(days=i)
wd = future_date.weekday()
# Corrected formula for stable projection
base_pred = weighted_mean + (slope * i)
seasonal_factor = seasonal_map.get(wd, 1.0)
predicted_amount = max(0, base_pred * seasonal_factor)
total_predicted += predicted_amount
# Confidence intervals widen over time
width = 1.96 * std_dev * (1 + (i * 0.05))
result.append({
'date': future_date.strftime('%Y-%m-%d'),
'amount': round(float(predicted_amount), 2),
'low': round(float(max(0, predicted_amount - width)), 2),
'high': round(float(predicted_amount + width), 2)
})
print(f"DEBUG: 30-Day Forecast Total: {total_predicted:.2f}")
return result
except Exception as e:
print(f"Critical Statistical Fallback Error: {e}")
import traceback
traceback.print_exc()
return None
def get_forecast(user_id):
forecaster = SpendingForecaster(user_id)
return forecaster.forecast_next_30_days()