| import pandas as pd |
| import numpy as np |
| from datetime import datetime, timedelta |
| from expense_tracker.utils import MongoDBClient |
|
|
| def forecast_income(user_id, days_to_forecast=30): |
| """ |
| Predicts future income based on future dates using Linear Regression. |
| Aggregates income by date before training. |
| """ |
| client = MongoDBClient.get_client() |
| from bson import ObjectId |
| |
| |
| if not isinstance(user_id, ObjectId): |
| try: |
| user_id = ObjectId(user_id) |
| except: |
| return {"error": "Invalid User ID format"} |
|
|
| user = client.users.find_one({'_id': user_id}, {'financial_data.incomes': 1}) |
| |
| incomes = [] |
| if user and 'financial_data' in user and 'incomes' in user['financial_data']: |
| incomes = user['financial_data']['incomes'] |
| |
| if not incomes: |
| return {"error": "Not enough data to forecast"} |
|
|
| |
| clean_data = [] |
| for inc in incomes: |
| date_val = inc.get('date') |
| if isinstance(date_val, str): |
| try: |
| date_val = datetime.strptime(date_val, '%Y-%m-%d') |
| except: |
| continue |
| elif not isinstance(date_val, datetime): |
| continue |
| |
| clean_data.append({ |
| 'date': date_val, |
| 'amount': float(inc.get('amount', 0)) |
| }) |
| |
| df = pd.DataFrame(clean_data) |
| if df.empty: |
| return {"error": "No valid income records found"} |
|
|
| |
| daily_income = df.groupby('date')['amount'].sum().reset_index() |
| daily_income = daily_income.sort_values('date') |
| |
| |
| if len(daily_income) < 2: |
| return { |
| "current_balance": daily_income['amount'].sum(), |
| "forecast": [], |
| "message": "Need more data points for regression" |
| } |
|
|
| |
| |
| daily_income['date_ordinal'] = daily_income['date'].map(datetime.toordinal) |
| |
| X = daily_income[['date_ordinal']] |
| y = daily_income['amount'] |
| |
| X = daily_income[['date_ordinal']] |
| y = daily_income['amount'] |
| |
| from sklearn.linear_model import LinearRegression |
| model = LinearRegression() |
| model.fit(X, y) |
| |
| |
| last_date = daily_income['date'].max() |
| future_dates = [last_date + timedelta(days=i) for i in range(1, days_to_forecast + 1)] |
| future_ordinals = np.array([d.toordinal() for d in future_dates]).reshape(-1, 1) |
| |
| predictions = model.predict(future_ordinals) |
| |
| forecast_data = [] |
| for date, pred in zip(future_dates, predictions): |
| forecast_data.append({ |
| 'date': date.strftime('%Y-%m-%d'), |
| 'predicted_amount': max(0, round(pred, 2)) |
| }) |
| |
| return { |
| "analysis": "Linear Regression Trend", |
| "slope": round(model.coef_[0], 2), |
| "forecast": forecast_data |
| } |
|
|