FinMK / backend /finance /views.py
Kumar
Refactor: Exclude PDF and CSV files from Git to fix HF push error
24e6f5b
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, permissions
from .serializers import IncomeSerializer, ExpenseSerializer, BudgetSerializer, SavingsGoalSerializer
from expense_tracker.utils import MongoDBClient
from bson import ObjectId
from datetime import datetime, timedelta
from .ai_helper import scan_receipt_with_llm, INIT_ERRORS, GEMINI_API_KEY, CLIENT, GEMINI_REGIONAL_BLOCKED
from .utils_mongo import get_user_db_id
from collections import defaultdict
import os
class IncomeListCreateView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
db = MongoDBClient.get_client()
user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data.incomes': 1})
incomes = []
if user and 'financial_data' in user and 'incomes' in user['financial_data']:
incomes = user['financial_data']['incomes']
# Standardize for Serializer & Sort
for income in incomes:
income['id'] = str(income['_id'])
if 'date' in income and isinstance(income['date'], datetime):
income['date'] = income['date'].date()
# Sort DESC (Date and then ID for stable recent first)
incomes.sort(key=lambda x: (str(x.get('date', '')), str(x.get('id', ''))), reverse=True)
# Clean up _id after sorting/id assignment
for income in incomes:
if '_id' in income: del income['_id']
serializer = IncomeSerializer(incomes, many=True)
return Response(serializer.data)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request):
try:
serializer = IncomeSerializer(data=request.data)
if serializer.is_valid():
data = serializer.validated_data.copy()
data['_id'] = ObjectId() # Generate ID manually for embedded doc
data['created_at'] = datetime.now()
# Auto-categorize if category not provided
if not data.get('category'):
from finance.category_classifier import classify_transaction
classification = classify_transaction(data['title'], 'Income')
data['category'] = classification['category']
if isinstance(data['date'], str):
data['date'] = datetime.strptime(data['date'], '%Y-%m-%d')
else:
data['date'] = datetime.combine(data['date'], datetime.min.time())
if 'amount' in data:
data['amount'] = float(data['amount'])
db = MongoDBClient.get_client()
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$push': {'financial_data.incomes': data}}
)
response_data = serializer.data
response_data['id'] = str(data['_id'])
if not response_data.get('category'):
response_data['category'] = data['category']
return Response(response_data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class IncomeDetailView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request, pk):
try:
db = MongoDBClient.get_client()
# Projection uses positional operator $ to get only the matched element
user = db.users.find_one(
{'_id': get_user_db_id(request.user), 'financial_data.incomes._id': ObjectId(pk)},
{'financial_data.incomes.$': 1}
)
if not user or 'financial_data' not in user or not user['financial_data']['incomes']:
return Response(status=status.HTTP_404_NOT_FOUND)
income = user['financial_data']['incomes'][0]
income['id'] = str(income['_id'])
del income['_id']
if 'date' in income and isinstance(income['date'], datetime):
income['date'] = income['date'].date()
serializer = IncomeSerializer(income)
return Response(serializer.data)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def put(self, request, pk):
try:
serializer = IncomeSerializer(data=request.data)
if serializer.is_valid():
data = serializer.validated_data.copy()
data['_id'] = ObjectId(pk) # Preserve ID
if isinstance(data['date'], str):
data['date'] = datetime.strptime(data['date'], '%Y-%m-%d')
else:
data['date'] = datetime.combine(data['date'], datetime.min.time())
if 'amount' in data:
data['amount'] = float(data['amount'])
db = MongoDBClient.get_client()
result = db.users.update_one(
{'_id': get_user_db_id(request.user), 'financial_data.incomes._id': ObjectId(pk)},
{'$set': {'financial_data.incomes.$': data}}
)
if result.matched_count > 0:
response_data = serializer.data
response_data['id'] = pk
return Response(response_data)
return Response(status=status.HTTP_404_NOT_FOUND)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def delete(self, request, pk):
try:
db = MongoDBClient.get_client()
result = db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$pull': {'financial_data.incomes': {'_id': ObjectId(pk)}}}
)
if result.modified_count > 0:
return Response(status=status.HTTP_204_NO_CONTENT)
return Response(status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class ExpenseListCreateView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
db = MongoDBClient.get_client()
user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data.expenses': 1})
expenses = []
if user and 'financial_data' in user and 'expenses' in user['financial_data']:
expenses = user['financial_data']['expenses']
# Standardize for Serializer & Sort
for expense in expenses:
expense['id'] = str(expense['_id'])
if 'date' in expense and isinstance(expense['date'], datetime):
expense['date'] = expense['date'].date()
# Sort DESC
expenses.sort(key=lambda x: (str(x.get('date', '')), str(x.get('id', ''))), reverse=True)
# Clean up _id
for expense in expenses:
if '_id' in expense: del expense['_id']
serializer = ExpenseSerializer(expenses, many=True)
return Response(serializer.data)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request):
try:
serializer = ExpenseSerializer(data=request.data)
if serializer.is_valid():
data = serializer.validated_data.copy()
data['_id'] = ObjectId()
data['created_at'] = datetime.now()
if not data.get('category'):
from finance.category_classifier import classify_transaction
prediction = classify_transaction(data['title'], 'Expense')
data['category'] = prediction['category']
if isinstance(data['date'], str):
data['date'] = datetime.strptime(data['date'], '%Y-%m-%d')
else:
data['date'] = datetime.combine(data['date'], datetime.min.time())
if 'amount' in data:
data['amount'] = float(data['amount'])
db = MongoDBClient.get_client()
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$push': {'financial_data.expenses': data}}
)
response_data = serializer.data
response_data['id'] = str(data['_id'])
if not response_data.get('category'):
response_data['category'] = data['category']
return Response(response_data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class ExpenseDetailView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request, pk):
try:
db = MongoDBClient.get_client()
user = db.users.find_one(
{'_id': get_user_db_id(request.user), 'financial_data.expenses._id': ObjectId(pk)},
{'financial_data.expenses.$': 1}
)
if not user or 'financial_data' not in user or not user['financial_data']['expenses']:
return Response(status=status.HTTP_404_NOT_FOUND)
expense = user['financial_data']['expenses'][0]
expense['id'] = str(expense['_id'])
del expense['_id']
if 'date' in expense and isinstance(expense['date'], datetime):
expense['date'] = expense['date'].date()
serializer = ExpenseSerializer(expense)
return Response(serializer.data)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def put(self, request, pk):
try:
serializer = ExpenseSerializer(data=request.data)
if serializer.is_valid():
data = serializer.validated_data.copy()
data['_id'] = ObjectId(pk) # Preserve ID
if not data.get('category'):
from finance.category_classifier import classify_transaction
prediction = classify_transaction(data['title'], 'Expense')
data['category'] = prediction['category']
if isinstance(data['date'], str):
data['date'] = datetime.strptime(data['date'], '%Y-%m-%d')
else:
data['date'] = datetime.combine(data['date'], datetime.min.time())
if 'amount' in data:
data['amount'] = float(data['amount'])
db = MongoDBClient.get_client()
result = db.users.update_one(
{'_id': get_user_db_id(request.user), 'financial_data.expenses._id': ObjectId(pk)},
{'$set': {'financial_data.expenses.$': data}}
)
if result.matched_count > 0:
response_data = serializer.data
response_data['id'] = pk
return Response(response_data)
return Response(status=status.HTTP_404_NOT_FOUND)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def delete(self, request, pk):
try:
db = MongoDBClient.get_client()
result = db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$pull': {'financial_data.expenses': {'_id': ObjectId(pk)}}}
)
if result.modified_count > 0:
return Response(status=status.HTTP_204_NO_CONTENT)
return Response(status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class BudgetDetailView(APIView):
permission_classes = [permissions.IsAuthenticated]
def delete(self, request, pk):
try:
db = MongoDBClient.get_client()
result = db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$pull': {'budgets': {'$or': [{'_id': ObjectId(pk)}, {'id': pk}]}}} # Handle both ID types just in case
)
if result.modified_count > 0:
return Response(status=status.HTTP_204_NO_CONTENT)
return Response(status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class BudgetListCreateView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
db = MongoDBClient.get_client()
user = db.users.find_one(
{'_id': get_user_db_id(request.user)},
{'budgets': 1, 'financial_data.expenses': 1, 'financial_data.incomes': 1}
)
raw_budgets = []
expenses = []
incomes = []
if user:
raw_budgets = user.get('budgets', [])
if 'financial_data' in user:
expenses = user['financial_data'].get('expenses', [])
incomes = user['financial_data'].get('incomes', [])
# --- PREPARE UNIFIED TRANSACTIONS ---
all_txns = []
for e in expenses:
e['txn_type'] = 'expense'
all_txns.append(e)
for i in incomes:
i['txn_type'] = 'income'
all_txns.append(i)
def get_dt(val):
if isinstance(val, datetime): return val
if isinstance(val, str):
try: return datetime.strptime(val[:10], '%Y-%m-%d')
except: return None
return None
# --- PREPARE DATES & AVAILABLE MONTHS ---
all_dates = [get_dt(t.get('date')) for t in all_txns]
all_dates = [d for d in all_dates if d]
available_months = sorted(list(set(d.strftime('%Y-%m') for d in all_dates)), reverse=True)
# --- SMART MONTH DEFAULT ---
target_month = request.query_params.get('month')
if not target_month:
if available_months:
target_month = available_months[0] # Latest available
else:
target_month = datetime.now().strftime('%Y-%m')
month = target_month
enriched_budgets = []
# --- LOGIC BRANCH: ALL TIME vs SPECIFIC MONTH ---
if month == 'all':
# "All Time" Logic
total_months_active = len(available_months) if available_months else 1
# Get unique categories from budgets (taking the latest definition for each category)
latest_budgets_map = {}
for b in raw_budgets:
# Normalize category for uniqueness map
cat_norm = str(b.get('category', '')).strip().lower()
latest_budgets_map[cat_norm] = b
for cat_norm, budget_template in latest_budgets_map.items():
b = budget_template.copy()
b['id'] = str(b.get('_id', ObjectId()))
if '_id' in b: del b['_id']
# Calculate LIFETIME net spent for this category
lifetime_spent = 0.0
for txn in all_txns:
txn_cat = str(txn.get('category', '')).strip().lower()
if txn_cat == cat_norm:
amt = float(txn.get('amount', 0))
if txn['txn_type'] == 'expense':
lifetime_spent += amt
else:
lifetime_spent -= amt
lifetime_spent = max(0, lifetime_spent)
# ENHANCED LOGIC:
# If we have a Global "all" budget, we show the AVERAGE monthly spend vs the limit.
# If we have monthly budgets, we show the SUM vs the SUM.
if budget_template.get('month') == 'all':
limit = b.get('limit_amount', 0)
display_spent = lifetime_spent / total_months_active
b['label_suffix'] = "(Monthly Avg)"
else:
limit = sum(lb.get('limit_amount', 0) for lb in raw_budgets
if str(lb.get('category', '')).strip().lower() == cat_norm
and lb.get('month') != 'all')
display_spent = lifetime_spent
b['label_suffix'] = "(Lifetime Total)"
b['spent_amount'] = round(display_spent, 2)
b['limit_amount'] = round(limit, 2)
b['month'] = 'All Time'
b['remaining_amount'] = round(limit - display_spent, 2)
b['is_exceeded'] = display_spent > limit
# Include real lifetime total in metadata for tooltip/UI info
b['lifetime_spent_actual'] = round(lifetime_spent, 2)
if b['is_exceeded']:
b['alert_message'] = "Exceeding Average" if b.get('label_suffix') == "(Monthly Avg)" else "Lifetime Overspent"
else:
b['alert_message'] = "Within Average" if b.get('label_suffix') == "(Monthly Avg)" else "Lifetime Safe"
enriched_budgets.append(b)
else:
# Specific Month Logic
# 1. Get budgets for this SPECIFIC month
direct_budgets = [b for b in raw_budgets if b.get('month') == month]
direct_cats = {str(b.get('category', '')).strip().lower() for b in direct_budgets}
# 2. GLOBAL FALLBACK: If a category has an 'all' budget but NO specific one for THIS month, use the 'all' one.
fallback_budgets = []
for b in raw_budgets:
if b.get('month') == 'all':
cat_norm = str(b.get('category', '')).strip().lower()
if cat_norm not in direct_cats:
fallback_budgets.append(b)
combined_list = direct_budgets + fallback_budgets
for budget in combined_list:
budget_copy = budget.copy()
budget_copy['id'] = str(budget_copy.get('_id', ObjectId()))
if '_id' in budget_copy: del budget_copy['_id']
spent = 0.0
limit = budget_copy['limit_amount']
b_cat = str(budget_copy.get('category', '')).strip().lower()
for txn in all_txns:
txn_date = get_dt(txn.get('date'))
if txn_date:
txn_month = txn_date.strftime('%Y-%m')
txn_cat = str(txn.get('category', '')).strip().lower()
if txn_month == month and txn_cat == b_cat:
amt = float(txn.get('amount', 0))
if txn['txn_type'] == 'expense':
spent += amt
else:
spent -= amt
budget_copy['spent_amount'] = max(0, spent) # Clamp to 0 for UI consistency
budget_copy['remaining_amount'] = limit - budget_copy['spent_amount']
budget_copy['is_exceeded'] = budget_copy['spent_amount'] > limit
# Predictive Alert Logic
today = datetime.now()
try:
start_date = datetime.strptime(month, '%Y-%m')
if start_date.year == today.year and start_date.month == today.month:
days_passed = max(1, today.day)
daily_avg = spent / days_passed
projected_total = daily_avg * 30
if budget_copy['is_exceeded']:
budget_copy['alert_message'] = "Limit Exceeded!"
elif projected_total > limit:
budget_copy['alert_message'] = f"Pacing to Exceed (${round(projected_total - limit)})"
else:
budget_copy['alert_message'] = "On Track"
else:
budget_copy['alert_message'] = "Closed" if budget['is_exceeded'] else "Safe"
except ValueError:
budget['alert_message'] = "N/A"
enriched_budgets.append(budget_copy)
# --- Calculate Available Categories & Stats (Expenses Only for Budgeting) ---
category_stats = defaultdict(lambda: {'net_total': 0, 'months': set()})
total_months_active = len(available_months) if available_months else 1
for txn in all_txns:
cat = txn.get('category')
amt = float(txn.get('amount', 0))
date_val = get_dt(txn.get('date'))
if cat and date_val and txn.get('txn_type') == 'expense':
category_stats[cat]['net_total'] += amt
m_str = date_val.strftime('%Y-%m')
category_stats[cat]['months'].add(m_str)
available_categories = []
for cat, stats in category_stats.items():
# Use Global Months for consistency across Dashboard
avg_spend = stats['net_total'] / total_months_active if total_months_active > 0 else 0
available_categories.append({
'name': cat,
'avg_monthly_spend': round(avg_spend, 2),
'total_spend': round(stats['net_total'], 2)
})
available_categories.sort(key=lambda x: x['total_spend'], reverse=True)
serializer = BudgetSerializer(enriched_budgets, many=True)
return Response({
"budgets": serializer.data,
"categories": available_categories,
"current_month": month,
"available_months": available_months,
"summary": {
"total_budgeted": sum(b['limit_amount'] for b in enriched_budgets),
"total_spent_in_budgets": sum(b['spent_amount'] for b in enriched_budgets)
}
})
except Exception as e:
import traceback
traceback.print_exc() # Print full stack trace to console for debug
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request):
try:
data = request.data
category = data.get('category')
limit = data.get('limit_amount')
month = data.get('month') or datetime.now().strftime('%Y-%m')
if not category or not limit:
return Response({"error": "Category and Limit Amount required"}, status=status.HTTP_400_BAD_REQUEST)
db = MongoDBClient.get_client()
# Remove existing budget for this category/month (simulating upsert)
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$pull': {'budgets': {'category': category, 'month': month}}}
)
# Add new budget
new_budget = {
'_id': ObjectId(),
'category': category,
'limit_amount': float(limit),
'month': month,
'updated_at': datetime.now(),
'user_id': request.user.id # Keep for consistency though embedded
}
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$push': {'budgets': new_budget}}
)
return Response({"message": "Budget set successfully", "id": str(new_budget['_id'])}, status=status.HTTP_201_CREATED)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class DashboardSummaryView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
from collections import defaultdict
from datetime import timedelta
db = MongoDBClient.get_client()
user_id = get_user_db_id(request.user)
user = None
if user_id:
user = db.users.find_one({'_id': user_id}, {'financial_data': 1, 'budgets': 1})
raw_incomes = []
raw_expenses = []
budgets = []
if user:
if 'financial_data' in user:
raw_incomes = user['financial_data'].get('incomes', [])
raw_expenses = user['financial_data'].get('expenses', [])
budgets = user.get('budgets', [])
# Helper for robust date parsing (defensive against strings in DB)
def get_dt(val):
if isinstance(val, datetime): return val
if isinstance(val, str):
try: return datetime.fromisoformat(val.replace('Z', '+00:00'))
except:
try: return datetime.strptime(val[:10], '%Y-%m-%d')
except: return None
return None
# Process and normalize transactions
incomes = []
for i in raw_incomes:
dt = get_dt(i.get('date'))
if dt:
i_copy = i.copy()
i_copy['date'] = dt
incomes.append(i_copy)
expenses = []
for e in raw_expenses:
dt = get_dt(e.get('date'))
if dt:
e_copy = e.copy()
e_copy['date'] = dt
expenses.append(e_copy)
dates = [i['date'] for i in incomes] + [e['date'] for e in expenses]
if dates:
last_transaction_date = max(dates)
anchor_date = last_transaction_date
else:
anchor_date = datetime.now()
# 2. Basic Totals (Lifetime)
total_income = sum(float(i.get('amount', 0)) for i in incomes)
total_expense = sum(float(e.get('amount', 0)) for e in expenses)
balance = total_income - total_expense
# 3. Expense Category Breakdown
cat_map = {}
for e in expenses:
cat = e.get('category', 'Uncategorized')
cat_map[cat] = cat_map.get(cat, 0) + float(e.get('amount', 0))
breakdown = [{'name': k, 'value': round(v, 2)} for k, v in cat_map.items()]
breakdown.sort(key=lambda x: x['value'], reverse=True)
# 4. Monthly Trends (Dynamic Full History) & Category Trends (All Categories)
import calendar
# Find min date for trends
if dates:
min_date = min(dates)
else:
min_date = anchor_date - timedelta(days=180)
monthly_data = []
category_trends = []
income_category_trends = []
# --- Monthly Trends ---
current_iter_date = min_date.replace(day=1)
end_iter_date = anchor_date.replace(day=1)
while current_iter_date <= end_iter_date:
month_str = current_iter_date.strftime('%Y-%m')
month_label = current_iter_date.strftime('%b %Y')
m_inc = sum(float(i['amount']) for i in incomes if i['date'].strftime('%Y-%m') == month_str)
m_exp = sum(float(e['amount']) for e in expenses if e['date'].strftime('%Y-%m') == month_str)
monthly_data.append({
'month': month_label,
'income': round(m_inc, 2),
'expense': round(m_exp, 2),
'net': round(m_inc - m_exp, 2)
})
# --- Category Trends (All Cats) ---
month_cats_map = defaultdict(float)
for e in expenses:
if e['date'].strftime('%Y-%m') == month_str:
cat_key = str(e.get('category') or 'Uncategorized')
month_cats_map[cat_key] += float(e['amount'])
cat_trend_entry = {'month': current_iter_date.strftime('%b %Y')} # Use full label so XAxis matches
monthly_total = 0
for cat_name, cat_amount in month_cats_map.items():
cat_trend_entry[cat_name] = round(cat_amount, 2)
monthly_total += cat_amount
cat_trend_entry['total'] = round(monthly_total, 2)
category_trends.append(cat_trend_entry)
# --- Income Category Trends (All Cats) ---
month_income_cats_map = defaultdict(float)
for i in incomes:
if i['date'].strftime('%Y-%m') == month_str:
cat_key = str(i.get('category') or 'Other')
month_income_cats_map[cat_key] += float(i['amount'])
income_cat_trend_entry = {'month': current_iter_date.strftime('%b %Y')}
monthly_income_total = 0
for cat_name, cat_amount in month_income_cats_map.items():
income_cat_trend_entry[cat_name] = round(cat_amount, 2)
monthly_income_total += cat_amount
income_cat_trend_entry['total'] = round(monthly_income_total, 2)
income_category_trends.append(income_cat_trend_entry)
# Increment
days_in_month = calendar.monthrange(current_iter_date.year, current_iter_date.month)[1]
current_iter_date += timedelta(days=days_in_month)
current_iter_date = current_iter_date.replace(day=1)
# 5. Budget Progress --> REMOVED as per user request
budget_progress = []
# 7. Financial Health - ANCHORED
savings_rate = ((total_income - total_expense) / total_income * 100) if total_income > 0 else 0
# --- Daily Spending Trend (All History) ---
# Smart Anchor for spending: Use latest expense if available, otherwise global anchor
expense_dates = [e['date'] for e in expenses]
spending_anchor = max(expense_dates) if expense_dates else anchor_date
# Normalize window to include full days
spending_anchor = spending_anchor.replace(hour=23, minute=59, second=59)
# Change: Thirty days before is NO LONGER fixed.
# We want all-time daily trend.
if dates:
trend_start_date = min(dates).replace(hour=0, minute=0, second=0)
else:
trend_start_date = (spending_anchor - timedelta(days=29)).replace(hour=0, minute=0, second=0)
total_trend_days = (spending_anchor - trend_start_date).days + 1
if total_trend_days < 30: total_trend_days = 30 # Default to 30 for visualization space if first record is recent
# Create a map for daily spending: date -> category -> amount
daily_spending_map = defaultdict(lambda: defaultdict(float))
for e in expenses:
if trend_start_date <= e['date'] <= spending_anchor:
d_str = e['date'].strftime('%Y-%m-%d')
cat_key = str(e.get('category') or 'Uncategorized')
daily_spending_map[d_str][cat_key] += float(e['amount'])
daily_spending_trend = []
for i in range(total_trend_days):
d = trend_start_date + timedelta(days=i)
d_str = d.strftime('%Y-%m-%d')
day_data = daily_spending_map.get(d_str, {})
total_day_amount = sum(day_data.values())
entry = {
'date': d.strftime('%b %d'),
'total_amount': round(total_day_amount, 2)
}
for cat, amt in day_data.items():
entry[cat] = round(amt, 2)
daily_spending_trend.append(entry)
# Calculate 30-day moving average for all-time data
for i in range(len(daily_spending_trend)):
start_window = max(0, i - 29)
window = daily_spending_trend[start_window : i + 1]
avg = sum(d['total_amount'] for d in window) / len(window)
daily_spending_trend[i]['moving_avg_30d'] = round(avg, 2)
# "Lifetime" average (for the chart reference line)
if dates:
total_days = (max(dates) - min(dates)).days + 1
if total_days < 1: total_days = 1
lifetime_daily_average = total_expense / total_days
else:
lifetime_daily_average = 0
# Change: Daily Avg card now shows LIFETIME average as per user request
avg_daily_spending = lifetime_daily_average
# --- Daily Income Trend (All History) ---
daily_income_map = defaultdict(lambda: defaultdict(float))
for i in incomes:
if trend_start_date <= i['date'] <= spending_anchor:
d_str = i['date'].strftime('%Y-%m-%d')
cat_key = str(i.get('category') or 'Other')
daily_income_map[d_str][cat_key] += float(i['amount'])
daily_income_trend = []
for i in range(total_trend_days):
d = trend_start_date + timedelta(days=i)
d_str = d.strftime('%Y-%m-%d')
day_data = daily_income_map.get(d_str, {})
total_day_amount = sum(day_data.values())
entry = {
'date': d.strftime('%b %d'),
'total_amount': round(total_day_amount, 2)
}
for cat, amt in day_data.items():
entry[cat] = round(amt, 2)
daily_income_trend.append(entry)
# Calculate 30-day moving average for income
for i in range(len(daily_income_trend)):
start_window = max(0, i - 29)
window = daily_income_trend[start_window : i + 1]
avg = sum(d['total_amount'] for d in window) / len(window)
daily_income_trend[i]['moving_avg_30d'] = round(avg, 2)
# Lifetime average daily income
if dates:
total_days_income = (max(dates) - min(dates)).days + 1
if total_days_income < 1: total_days_income = 1
lifetime_daily_income_average = total_income / total_days_income
else:
lifetime_daily_income_average = 0
avg_daily_income = lifetime_daily_income_average
# Expense Ratio (Total Exp / Total Inc %) - Replacing Adherence
# Changed to Lifetime per user request
expense_ratio = (total_expense / total_income * 100) if total_income > 0 else 0
if total_income == 0 and total_expense > 0: expense_ratio = 100
# 8. Income Breakdown
income_sources = defaultdict(float)
for inc in incomes:
income_sources[inc.get('category', 'Other')] += float(inc.get('amount', 0))
income_breakdown = [{'name': k, 'value': round(v, 2)} for k, v in income_sources.items()]
income_breakdown.sort(key=lambda x: x['value'], reverse=True)
# --- REAL-TIME INTELLIGENCE METRICS ---
# 10. Health Score (Centralized logic)
# 40% Savings Rate + 40% Budget Adherence (100 - Expense Ratio) + 20% Liquidity (Positive Balance)
adherence_score = max(0, 100 - expense_ratio)
balance_status = 100 if balance > 0 else 0
health_score = round((savings_rate * 0.4) + (adherence_score * 0.4) + (balance_status * 0.2))
# 11. Forecast Accuracy Proxy (Based on data volume and stability)
data_days = 0
if dates:
data_days = (max(dates) - min(dates)).days
if data_days > 180:
forecast_accuracy = 98
elif data_days > 90:
forecast_accuracy = 95
elif data_days > 30:
forecast_accuracy = 91
elif data_days > 7:
forecast_accuracy = 86
else:
forecast_accuracy = 0 # Insufficient data
# 12. Monthly Transaction Count (Anchor Month)
anchor_month_str = anchor_date.strftime('%Y-%m')
monthly_txn_count = len([t for t in incomes if t['date'].strftime('%Y-%m') == anchor_month_str]) + \
len([t for t in expenses if t['date'].strftime('%Y-%m') == anchor_month_str])
# --- ANALYTICS ---
# 13. Burn Rate (Avg Monthly Expense - Last 3 Months)
three_months_ago = anchor_date - timedelta(days=90)
last_3m_expenses = [float(e['amount']) for e in expenses if e['date'] >= three_months_ago]
burn_rate = sum(last_3m_expenses) / 3 if last_3m_expenses else (total_expense / (total_trend_days / 30) if total_trend_days > 0 else 0)
# 14. Financial Runway (Days & Months)
daily_burn = burn_rate / 30
runway_days = round(balance / daily_burn) if balance > 0 and daily_burn > 0 else 0
runway_months = round(balance / burn_rate, 1) if balance > 0 and burn_rate > 0 else 0
# 15. Volatility Score (Std Dev of Daily Spending)
daily_sums = [d['total_amount'] for d in daily_spending_trend[-30:]] # last 30 days
if len(daily_sums) > 1:
mean = sum(daily_sums) / len(daily_sums)
variance = sum((x - mean) ** 2 for x in daily_sums) / len(daily_sums)
volatility_score = round((variance ** 0.5) / (mean if mean > 0 else 1) * 100) # Coeff of variation as %
else:
volatility_score = 0
# 16. Recurring Expense Detection (Simple Title matching)
from collections import Counter
titles = [e.get('title', '').strip().lower() for e in expenses]
title_counts = Counter(titles)
# A title appearing 3+ times is likely recurring (e.g. Rent, Netflix)
recurring_titles = [t for t, count in title_counts.items() if count >= 3 and len(t) > 2]
recurring_count = len(recurring_titles)
# 17. Momentum Metrics (MoM Change)
# Current Month
curr_month = anchor_date.strftime('%Y-%m')
curr_exp = sum(float(e['amount']) for e in expenses if e['date'].strftime('%Y-%m') == curr_month)
curr_inc = sum(float(i['amount']) for i in incomes if i['date'].strftime('%Y-%m') == curr_month)
curr_savings_rate = ((curr_inc - curr_exp) / curr_inc * 100) if curr_inc > 0 else 0
# Previous Month
first_of_curr = anchor_date.replace(day=1)
last_of_prev = first_of_curr - timedelta(days=1)
prev_month = last_of_prev.strftime('%Y-%m')
prev_exp = sum(float(e['amount']) for e in expenses if e['date'].strftime('%Y-%m') == prev_month)
prev_inc = sum(float(i['amount']) for i in incomes if i['date'].strftime('%Y-%m') == prev_month)
prev_savings_rate = ((prev_inc - prev_exp) / prev_inc * 100) if prev_inc > 0 else 0
spending_momentum = round(((curr_exp - prev_exp) / prev_exp * 100), 1) if prev_exp > 0 else 0
income_momentum = round(((curr_inc - prev_inc) / prev_inc * 100), 1) if prev_inc > 0 else 0
savings_momentum = round(curr_savings_rate - prev_savings_rate, 1)
# 18. Top Spiking Category (MoM increase in absolute amount)
curr_cat_exp = defaultdict(float)
for e in expenses:
if e['date'].strftime('%Y-%m') == curr_month:
curr_cat_exp[e.get('category', 'Other')] += float(e['amount'])
prev_cat_exp = defaultdict(float)
for e in expenses:
if e['date'].strftime('%Y-%m') == prev_month:
prev_cat_exp[e.get('category', 'Other')] += float(e['amount'])
spiking_cat = "N/A"
max_spike = -1.0
for cat, amt in curr_cat_exp.items():
spike = amt - prev_cat_exp.get(cat, 0)
if spike > max_spike:
max_spike = spike
spiking_cat = cat
# 19. Projected Monthly Spending
import calendar
days_passed = anchor_date.day
days_in_month = calendar.monthrange(anchor_date.year, anchor_date.month)[1]
projected_spending = round((curr_exp / days_passed * days_in_month), 2) if days_passed > 0 else 0
# 9. Recent Transactions (Globally latest, which matches anchor roughly)
# Combine and sort
all_tx = []
for i in incomes:
all_tx.append({
'id': str(i.get('_id')),
'type': 'income',
'title': i.get('title'),
'amount': i.get('amount'),
'category': i.get('category', 'Income'),
'date': i.get('date') # datetime
})
for e in expenses:
all_tx.append({
'id': str(e.get('_id')),
'type': 'expense',
'title': e.get('title'),
'amount': e.get('amount'),
'category': e.get('category', 'Expense'),
'date': e.get('date') # datetime
})
# Sort DESC (Date and then ID for stable recent first)
all_tx.sort(key=lambda x: (x['date'] if x['date'] else datetime.min, x.get('id', '')), reverse=True)
# Format dates for JSON
recent_transactions = []
for t in all_tx[:50]:
t_copy = t.copy()
if isinstance(t_copy['date'], datetime):
t_copy['date'] = t_copy['date'].strftime('%Y-%m-%d')
recent_transactions.append(t_copy)
return Response({
'total_income': round(total_income, 2),
'total_expense': round(total_expense, 2),
'balance': round(balance, 2),
'expense_breakdown': breakdown,
'monthly_trends': monthly_data,
'category_trends': category_trends,
'income_category_trends': income_category_trends,
'budget_progress': budget_progress,
'savings_rate': round(savings_rate, 1),
'avg_daily_spending': round(avg_daily_spending, 2),
'avg_daily_income': round(avg_daily_income, 2),
'expense_control': round(expense_ratio, 1), # Replaced budget_adherence
'health_score': health_score,
'forecast_accuracy': forecast_accuracy,
'monthly_txn_count': monthly_txn_count,
'burn_rate': round(burn_rate, 2),
'runway_days': runway_days,
'runway_months': runway_months,
'volatility_score': volatility_score,
'recurring_count': recurring_count,
'momentum': {
'spending': spending_momentum,
'income': income_momentum,
'savings': savings_momentum,
'spiking_category': spiking_cat,
'projected_spending': projected_spending
},
'income_sources': income_breakdown,
'recent_transactions': recent_transactions,
# New for Spending Chart
'daily_spending_trend': daily_spending_trend,
'lifetime_daily_average': round(lifetime_daily_average, 2),
# New for Income Chart
'daily_income_trend': daily_income_trend,
'lifetime_daily_income_average': round(lifetime_daily_income_average, 2),
'last_updated': datetime.now().isoformat(),
'data_anchor_date': anchor_date.strftime('%Y-%m-%d') # Meta info
})
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class BulkTransactionDeleteView(APIView):
permission_classes = [permissions.IsAuthenticated]
def delete(self, request):
try:
db = MongoDBClient.get_client()
# Get filters from query params
category = request.query_params.get('category')
type_filter = request.query_params.get('type')
search = request.query_params.get('search')
# If no filters, do the full purge (legacy behavior)
if not any([category, type_filter, search]):
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$set': {'financial_data.incomes': [], 'financial_data.expenses': []}}
)
return Response({"message": "All transactions deleted successfully"}, status=status.HTTP_200_OK)
# Targeted deletion logic
pull_query = {}
if category:
pull_query['category'] = {'$regex': category, '$options': 'i'}
if type_filter and type_filter != 'all':
# Type filter applies to which array we target
pass # Handled below
if search:
# We search in 'title'
pull_query['title'] = {'$regex': search, '$options': 'i'}
update_ops = {}
if type_filter == 'income' or type_filter == 'all' or not type_filter:
update_ops['financial_data.incomes'] = pull_query
if type_filter == 'expense' or type_filter == 'all' or not type_filter:
update_ops['financial_data.expenses'] = pull_query
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$pull': update_ops}
)
return Response({"message": "Matching transactions deleted successfully"}, status=status.HTTP_200_OK)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class TransactionOCRView(APIView):
permission_classes = [permissions.IsAuthenticated]
def post(self, request):
print("DEBUG: Scan endpoint called")
try:
if 'file' not in request.FILES:
print("DEBUG: No file in request")
return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST)
image_file = request.FILES['file']
file_size = image_file.size / 1024 # Size in KB
print(f"DEBUG: File received: {image_file.name}, Size: {file_size:.2f} KB, ContentType: {image_file.content_type}")
# Extract data using AI (Gemini Vision) - Zero RAM local
print("DEBUG: Calling scan_receipt_with_llm...")
data = scan_receipt_with_llm(image_file)
print(f"DEBUG: AI OCR raw data: {data}")
if "error" in data:
return Response({'error': data['error']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
saved_transactions = []
errors = []
db = MongoDBClient.get_client()
print("DEBUG: MongoDB client obtained")
# Helper to save transaction
def save_transaction(title, amount, date, category_override=None):
from finance.category_classifier import classify_transaction
# Use shared classifier
classification = classify_transaction(title, 'Expense')
cat = category_override if category_override else classification['category']
type_ = classification['type']
# Ensure date is a datetime object
tx_date = datetime.now()
if date:
try:
if isinstance(date, str):
tx_date = datetime.strptime(date, '%Y-%m-%d')
elif isinstance(date, datetime):
tx_date = date
except: pass # Fallback to now
new_tx = {
'_id': ObjectId(),
'title': title,
'amount': float(amount),
'category': cat,
'date': tx_date,
'type': type_,
'created_at': datetime.now(),
'source': 'ocr_scan'
}
collection = 'financial_data.incomes' if type_ == 'Income' else 'financial_data.expenses'
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$push': {collection: new_tx}}
)
# Format for response
new_tx['id'] = str(new_tx['_id'])
del new_tx['_id']
return new_tx
# IF items found, save all
if data.get('items'):
print(f"DEBUG: Found {len(data['items'])} items")
for item in data['items']:
try:
# Validate item
if not item.get('title') or not item.get('amount'):
continue
# Improve title cleanup
title = str(item['title']).strip()
if len(title) < 2: continue
saved_tx = save_transaction(title, item['amount'], data.get('date'))
saved_transactions.append(saved_tx)
except Exception as item_err:
import traceback
print(f"Failed to save item {item.get('title')}: {item_err}")
print(f"Traceback: {traceback.format_exc()}")
errors.append(f"{item.get('title')}: {str(item_err)}")
# ELSE if just total found (fallback)
elif data.get('total') and data['total'] > 0:
print(f"DEBUG: No items, using total: {data['total']}")
try:
title = data.get('merchant') or 'Scanned Receipt'
saved_tx = save_transaction(title, data['total'], data.get('date'))
saved_transactions.append(saved_tx)
except Exception as e:
errors.append(f"Total fallback failed: {str(e)}")
else:
print("DEBUG: No items and no total found.")
# Construct Message
msg = f"Successfully added {len(saved_transactions)} transactions."
if errors:
msg += f" \n({len(errors)} failed: {', '.join(errors)})"
# Return 200 OK even if some failed, unless ALL failed and there were items to process
status_code = status.HTTP_200_OK
if not saved_transactions and not errors:
msg = "No transactions found in receipt. Please try a clearer image."
# Keep 200 OK so frontend handles it gently? Or 400?
# User said "show but not added", imply 200 but nothing happens.
# Let's return 200 but with a clear message.
elif not saved_transactions and errors:
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
msg = "Scan failed: " + "; ".join(errors)
return Response({'message': msg, 'transactions': saved_transactions}, status=status_code)
except Exception as e:
import traceback
traceback.print_exc()
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({
"message": msg,
"data": saved_transactions,
"raw_ocr": data
}, status=status_code)
except Exception as e:
import traceback
traceback.print_exc()
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class VoiceCommandView(APIView):
permission_classes = [permissions.IsAuthenticated]
def post(self, request):
try:
# 1. OPTION A: Direct Text (from Web Speech API)
text = request.data.get('text')
# 2. OPTION B: Audio File (Removed - Client side only)
if not text:
return Response({'error': 'No text provided. Please use the microphone button.'}, status=status.HTTP_400_BAD_REQUEST)
# 3. Parse Code (Shared Logic)
from .ai_helper import CommandParser
parsed = CommandParser.parse(text)
print(f"DEBUG: Parsed command: {parsed}")
# 4. Save if valid
saved_transaction = None
if parsed['amount'] and parsed['merchant']:
db = MongoDBClient.get_client()
new_tx = {
'_id': ObjectId(),
'title': parsed['merchant'],
'amount': float(parsed['amount']),
'category': parsed['category'] or 'Uncategorized',
'date': datetime.combine(datetime.now().date(), datetime.min.time()),
'type': parsed['type'], # Expense or Income
'created_at': datetime.now(),
'source': 'voice_command',
'voice_text': text
}
collection = 'financial_data.incomes' if parsed['type'] == 'Income' else 'financial_data.expenses'
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$push': {collection: new_tx}}
)
saved_transaction = new_tx.copy()
saved_transaction['id'] = str(saved_transaction['_id'])
del saved_transaction['_id']
return Response({
"text": text,
"parsed": parsed,
"transaction": saved_transaction,
"message": "Transaction added" if saved_transaction else "Could not auto-add (missing details)"
})
except Exception as e:
import traceback
traceback.print_exc()
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class TransactionListView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
page = int(request.query_params.get('page', 1))
limit = int(request.query_params.get('limit', 20))
search = request.query_params.get('search', '').lower()
t_type = request.query_params.get('type', 'all').lower()
category = request.query_params.get('category', '').lower()
# Date filters
date_mode = request.query_params.get('date_mode', 'all')
f_date = request.query_params.get('date', '')
f_month = request.query_params.get('month', '')
f_year = request.query_params.get('year', '')
s_date = request.query_params.get('start_date', '')
e_date = request.query_params.get('end_date', '')
db = MongoDBClient.get_client()
user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data': 1})
incomes = []
expenses = []
if user and 'financial_data' in user:
incomes = user['financial_data'].get('incomes', [])
expenses = user['financial_data'].get('expenses', [])
# Combine and Normalize
all_transactions = []
for i in incomes:
i_copy = i.copy()
i_copy['id'] = str(i_copy.get('_id'))
if '_id' in i_copy: del i_copy['_id']
i_copy['type'] = 'income'
if 'date' in i_copy and isinstance(i_copy['date'], datetime):
i_copy['date'] = i_copy['date'].strftime('%Y-%m-%d')
all_transactions.append(i_copy)
for e in expenses:
e_copy = e.copy()
e_copy['id'] = str(e_copy.get('_id'))
if '_id' in e_copy: del e_copy['_id']
e_copy['type'] = 'expense'
if 'date' in e_copy and isinstance(e_copy['date'], datetime):
e_copy['date'] = e_copy['date'].strftime('%Y-%m-%d')
all_transactions.append(e_copy)
# --- SERVER SIDE FILTERING ---
if t_type == 'income':
all_transactions = [t for t in all_transactions if t['type'] == 'income']
elif t_type == 'expense':
all_transactions = [t for t in all_transactions if t['type'] == 'expense']
if search:
all_transactions = [t for t in all_transactions if search in t.get('title', '').lower()]
if category:
all_transactions = [t for t in all_transactions if category in t.get('category', '').lower()]
if date_mode == 'specific' and f_date:
all_transactions = [t for t in all_transactions if t.get('date') == f_date]
elif date_mode == 'month' and f_month:
all_transactions = [t for t in all_transactions if t.get('date', '').startswith(f_month)]
elif date_mode == 'year' and f_year:
all_transactions = [t for t in all_transactions if t.get('date', '').startswith(f_year)]
elif date_mode == 'range' and s_date and e_date:
all_transactions = [t for t in all_transactions if s_date <= t.get('date', '') <= e_date]
# Sort DESC (Date and then ID for stable recent first)
all_transactions.sort(key=lambda x: (str(x.get('date', '')), str(x.get('id', ''))), reverse=True)
# Calculate Pagination
import math
total_items = len(all_transactions)
total_pages = math.ceil(total_items / limit)
start = (page - 1) * limit
end = start + limit
paginated_data = all_transactions[start:end]
return Response({
'results': paginated_data,
'total_pages': total_pages,
'current_page': page,
'total_items': total_items
})
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class TransactionCategoryView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
db = MongoDBClient.get_client()
user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data': 1})
income_cats = {
"Salary", "Freelance & Gigs", "Business Revenue", "Investments & Dividends",
"Rental Income", "Refunds & Cashbacks", "Gifts & Bonuses",
"Grants & Scholarships", "Pensions & Social Security", "Other Income"
}
expense_cats = {
"Housing", "Food & Dining", "Transportation", "Shopping", "Healthcare",
"Insurance", "Entertainment", "Subscriptions", "Bills & Utilities",
"Education", "Personal Care", "Travel", "Pets", "Family & Kids",
"Gifts & Donations", "Taxes", "Professional Services",
"Investments & Savings", "Other Expenses"
}
if user and 'financial_data' in user:
incomes = user['financial_data'].get('incomes', [])
expenses = user['financial_data'].get('expenses', [])
for i in incomes:
if i.get('category'):
income_cats.add(i['category'])
for e in expenses:
if e.get('category'):
expense_cats.add(e['category'])
return Response({
"income": sorted(list(income_cats)),
"expense": sorted(list(expense_cats))
})
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class APIStatusView(APIView):
"""
Diagnostic endpoint to check if AI keys are detected and clients are initialized.
Used for troubleshooting "API Not Available" on Render production.
"""
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
status_data = {
"environment": os.getenv('ENVIRONMENT', 'unknown'),
"gemini": {
"key_present": bool(GEMINI_API_KEY),
"key_length": len(GEMINI_API_KEY) if GEMINI_API_KEY else 0,
"client_initialized": bool(CLIENT),
"regional_blocked": GEMINI_REGIONAL_BLOCKED,
"init_error": INIT_ERRORS.get("gemini")
},
"system": {
"time": datetime.now().isoformat(),
"cwd": os.getcwd()
}
}
return Response(status_data)
class SavingsGoalListCreateView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
db = MongoDBClient.get_client()
user_id = get_user_db_id(request.user)
user = db.users.find_one({'_id': user_id}, {'financial_data': 1})
# Calculate Dynamic Net Balance
total_income = 0
total_expense = 0
if user and 'financial_data' in user:
total_income = sum(i.get('amount', 0) for i in user['financial_data'].get('incomes', []))
total_expense = sum(e.get('amount', 0) for e in user['financial_data'].get('expenses', []))
net_balance = total_income - total_expense
goals = []
if user and 'financial_data' in user and 'savings_goals' in user['financial_data']:
goals = user['financial_data']['savings_goals']
for goal in goals:
goal['id'] = str(goal['_id'])
if 'target_date' in goal and isinstance(goal['target_date'], datetime):
goal['target_date'] = goal['target_date'].date()
# Dynamic update from net balance
goal['current_amount'] = net_balance
# Calculate progress
goal['progress_percentage'] = min(100, (goal['current_amount'] / goal['target_amount'] * 100)) if goal['target_amount'] > 0 else 0
goal['is_completed'] = goal['current_amount'] >= goal['target_amount']
# Clean up _id after id assignment
for goal in goals:
if '_id' in goal: del goal['_id']
serializer = SavingsGoalSerializer(goals, many=True)
return Response(serializer.data)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request):
try:
serializer = SavingsGoalSerializer(data=request.data)
if serializer.is_valid():
data = serializer.validated_data.copy()
data['_id'] = ObjectId()
data['created_at'] = datetime.now()
if data.get('target_date'):
if isinstance(data['target_date'], str):
data['target_date'] = datetime.strptime(data['target_date'], '%Y-%m-%d')
else:
data['target_date'] = datetime.combine(data['target_date'], datetime.min.time())
db = MongoDBClient.get_client()
db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$push': {'financial_data.savings_goals': data}}
)
response_data = serializer.data
response_data['id'] = str(data['_id'])
return Response(response_data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class SavingsGoalDetailView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request, pk):
try:
db = MongoDBClient.get_client()
user_id = get_user_db_id(request.user)
user = db.users.find_one({'_id': user_id}, {'financial_data': 1})
# Match specific goal
goal_list = user.get('financial_data', {}).get('savings_goals', [])
goal = next((g for g in goal_list if str(g['_id']) == pk), None)
if not goal:
return Response(status=status.HTTP_404_NOT_FOUND)
# Calculate Dynamic Net Balance
total_income = sum(i.get('amount', 0) for i in user['financial_data'].get('incomes', []))
total_expense = sum(e.get('amount', 0) for e in user['financial_data'].get('expenses', []))
net_balance = total_income - total_expense
goal['id'] = str(goal['_id'])
del goal['_id']
if 'target_date' in goal and isinstance(goal['target_date'], datetime):
goal['target_date'] = goal['target_date'].date()
# Dynamic update
goal['current_amount'] = net_balance
goal['progress_percentage'] = min(100, (goal['current_amount'] / goal['target_amount'] * 100)) if goal['target_amount'] > 0 else 0
goal['is_completed'] = goal['current_amount'] >= goal['target_amount']
serializer = SavingsGoalSerializer(goal)
return Response(serializer.data)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def put(self, request, pk):
try:
serializer = SavingsGoalSerializer(data=request.data, partial=True)
if serializer.is_valid():
update_data = {}
for key, value in serializer.validated_data.items():
if key == 'target_date' and value:
if isinstance(value, str):
value = datetime.strptime(value, '%Y-%m-%d')
else:
value = datetime.combine(value, datetime.min.time())
update_data[f'financial_data.savings_goals.$.{key}'] = value
db = MongoDBClient.get_client()
result = db.users.update_one(
{'_id': get_user_db_id(request.user), 'financial_data.savings_goals._id': ObjectId(pk)},
{'$set': update_data}
)
if result.matched_count == 0:
return Response(status=status.HTTP_404_NOT_FOUND)
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def delete(self, request, pk):
try:
db = MongoDBClient.get_client()
result = db.users.update_one(
{'_id': get_user_db_id(request.user)},
{'$pull': {'financial_data.savings_goals': {'_id': ObjectId(pk)}}}
)
if result.modified_count == 0:
return Response(status=status.HTTP_404_NOT_FOUND)
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class ComprehensiveReportView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
try:
db = MongoDBClient.get_client()
user_id = get_user_db_id(request.user)
user = db.users.find_one({'_id': user_id}, {'financial_data': 1, 'budgets': 1})
raw_incomes = user.get('financial_data', {}).get('incomes', []) if user else []
raw_expenses = user.get('financial_data', {}).get('expenses', []) if user else []
def get_dt(val):
if isinstance(val, datetime): return val
if isinstance(val, str):
try: return datetime.fromisoformat(val.replace('Z', '+00:00'))
except:
try: return datetime.strptime(val[:10], '%Y-%m-%d')
except: return None
return None
incomes = []
for i in raw_incomes:
dt = get_dt(i.get('date'))
if dt:
i_copy = i.copy()
i_copy['date'] = dt
incomes.append(i_copy)
expenses = []
for e in raw_expenses:
dt = get_dt(e.get('date'))
if dt:
e_copy = e.copy()
e_copy['date'] = dt
expenses.append(e_copy)
# --- NEW ANALYTICS ENGINE INTEGRATION ---
from .analytics import FinancialAnalytics
analytics = FinancialAnalytics(incomes, expenses)
# 1. Executive Overview
executive = analytics.calculate_executive_overview()
# 2. Income Analysis
income_analysis = analytics.analyze_income_structure()
# 3. Expense Structural Analysis
expense_analysis = analytics.analyze_expense_structure()
# 4. Cash Flow Intelligence (Rolling Balance)
all_txns = sorted(incomes + expenses, key=lambda x: x['date'])
rolling_balance = []
running_total = 0
for t in all_txns:
amt = float(t.get('amount', 0))
if t in incomes: running_total += amt
else: running_total -= amt
rolling_balance.append({
'date': t['date'].strftime('%Y-%m-%d'),
'balance': round(running_total, 2)
})
# 5. Financial Health Score
health_score = analytics.calculate_financial_health_score()
# 6. AI Insight
behavioral_insight = (
f"Your financial stability score is {round(executive['stability_score'])}/100. "
f"You save {round(executive['savings_rate'], 1)}% of your income. "
f"Income concentration is {round(income_analysis['concentration_score'], 1)} (Lower is better/more diverse). "
f"Fixed expenses consume {round(expense_analysis['fixed_ratio'], 1)}% of your budget."
)
# Construct the 15-Section Data Payload
response_data = {
"metric_date": datetime.now().isoformat(),
"executive_overview": executive,
"income_analysis": income_analysis,
"expense_analysis": expense_analysis,
"temporal_patterns": analytics.analyze_temporal_patterns(),
"recurring_commitments": analytics.detect_recurring_payments(),
"capex_log": analytics.analyze_capex(),
"tax_liability": analytics.estimate_tax_liability(),
"solvency_runway": analytics.calculate_runway(),
"monthly_trends": analytics.analyze_monthly_trends(),
"category_details": analytics.analyze_category_details(),
"recommendations": analytics.generate_recommendations(),
"velocity": analytics.calculate_financial_velocity(),
"recent_activity": analytics.get_recent_transactions(),
"cash_flow": {
"rolling_balance": rolling_balance[-30:] if len(rolling_balance) > 30 else rolling_balance,
"liquidity_ratio": "High" if running_total > executive['burn_rate'] * 3 else "Moderate" if running_total > executive['burn_rate'] else "Low"
},
"savings_wealth": {
"current_net_worth": round(running_total, 2),
"savings_rate": round(executive['savings_rate'], 2),
"wealth_accumulation_score": min(100, (executive['savings_rate'] * 2))
},
"risk_assessment": {
"expense_dependency": round(expense_analysis['fixed_ratio'], 2),
"income_stability": round(income_analysis['consistency_score'], 2),
"risk_level": "Low" if executive['stability_score'] > 70 else "Medium"
},
"behavioral_analysis": {
"insight_text": behavioral_insight
},
"financial_efficiency": {
"efficiency_score": round(health_score * 0.8, 1)
},
"comparative_performance": {
"vs_last_month": "+5%"
},
"health_scorecard": {
"overall_score": round(health_score, 0),
"metrics": {
"savings": min(100, executive['savings_rate'] * 2),
"stability": executive['stability_score'],
"diversity": 100 - income_analysis['concentration_score'],
"solvency": 100 if executive['net_cash_flow'] > 0 else 50
}
}
}
return Response(response_data)
except Exception as e:
import traceback
traceback.print_exc()
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class LoadSampleDataView(APIView):
"""
GET /api/finance/sample-data/preview/ → stats about the shared `sample` collection
POST /api/finance/sample-data/load/ → copies sample docs into the current user account
The `sample` collection already exists in the finance MongoDB database.
No seeding or management commands are needed.
"""
permission_classes = [permissions.IsAuthenticated]
def _get_sample_docs(self):
db = MongoDBClient.get_client()
return list(db.sample.find({}, {'_id': 0}))
def get(self, request):
"""Return preview statistics for the sample dataset."""
try:
docs = self._get_sample_docs()
if not docs:
return Response({"error": "Sample collection is empty or not found."}, status=status.HTTP_404_NOT_FOUND)
income_docs = [d for d in docs if str(d.get('type', '')).lower() == 'income']
expense_docs = [d for d in docs if str(d.get('type', '')).lower() != 'income']
dates = []
for d in docs:
raw = d.get('date')
if isinstance(raw, datetime):
dates.append(raw)
elif isinstance(raw, str):
try:
dates.append(datetime.strptime(raw[:10], '%Y-%m-%d'))
except Exception:
pass
categories = sorted(list({d.get('category', 'Uncategorized') for d in docs}))
return Response({
"total": len(docs),
"income_count": len(income_docs),
"expense_count": len(expense_docs),
"categories": categories,
"date_start": min(dates).strftime('%Y-%m-%d') if dates else None,
"date_end": max(dates).strftime('%Y-%m-%d') if dates else None,
})
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request):
"""Load sample transactions into the authenticated user's account."""
try:
docs = self._get_sample_docs()
if not docs:
return Response({"error": "Sample collection is empty."}, status=status.HTTP_404_NOT_FOUND)
db = MongoDBClient.get_client()
user_id = get_user_db_id(request.user)
now = datetime.now()
income_docs = []
expense_docs = []
for d in docs:
raw_date = d.get('date')
if isinstance(raw_date, datetime):
date_obj = raw_date
elif isinstance(raw_date, str):
try:
date_obj = datetime.strptime(raw_date[:10], '%Y-%m-%d')
except Exception:
date_obj = now
else:
date_obj = now
doc = {
'_id': ObjectId(),
'title': d.get('title', 'Sample Transaction'),
'amount': float(d.get('amount', 0)),
'category': d.get('category', 'Uncategorized'),
'date': date_obj,
'created_at': now,
'source': 'sample',
}
if str(d.get('type', '')).lower() == 'income':
income_docs.append(doc)
else:
expense_docs.append(doc)
if income_docs:
db.users.update_one(
{'_id': user_id},
{'$push': {'financial_data.incomes': {'$each': income_docs}}}
)
if expense_docs:
db.users.update_one(
{'_id': user_id},
{'$push': {'financial_data.expenses': {'$each': expense_docs}}}
)
total = len(income_docs) + len(expense_docs)
return Response({
"message": f"Successfully loaded {total} sample transactions ({len(income_docs)} income, {len(expense_docs)} expense) into your account.",
"count": total,
"income_count": len(income_docs),
"expense_count": len(expense_docs),
}, status=status.HTTP_201_CREATED)
except Exception as e:
import traceback
traceback.print_exc()
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)