| from rest_framework.views import APIView |
| from rest_framework.response import Response |
| from rest_framework import status, permissions |
| from .serializers import IncomeSerializer, ExpenseSerializer, BudgetSerializer, SavingsGoalSerializer |
| from expense_tracker.utils import MongoDBClient |
| from bson import ObjectId |
| from datetime import datetime, timedelta |
| from .ai_helper import scan_receipt_with_llm, INIT_ERRORS, GEMINI_API_KEY, CLIENT, GEMINI_REGIONAL_BLOCKED |
| from .utils_mongo import get_user_db_id |
| from collections import defaultdict |
| import os |
|
|
|
|
| class IncomeListCreateView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| db = MongoDBClient.get_client() |
| user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data.incomes': 1}) |
| |
| incomes = [] |
| if user and 'financial_data' in user and 'incomes' in user['financial_data']: |
| incomes = user['financial_data']['incomes'] |
|
|
| |
| for income in incomes: |
| income['id'] = str(income['_id']) |
| if 'date' in income and isinstance(income['date'], datetime): |
| income['date'] = income['date'].date() |
| |
| |
| incomes.sort(key=lambda x: (str(x.get('date', '')), str(x.get('id', ''))), reverse=True) |
| |
| |
| for income in incomes: |
| if '_id' in income: del income['_id'] |
| |
| serializer = IncomeSerializer(incomes, many=True) |
| return Response(serializer.data) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def post(self, request): |
| try: |
| serializer = IncomeSerializer(data=request.data) |
| if serializer.is_valid(): |
| data = serializer.validated_data.copy() |
| data['_id'] = ObjectId() |
| data['created_at'] = datetime.now() |
| |
| |
| if not data.get('category'): |
| from finance.category_classifier import classify_transaction |
| classification = classify_transaction(data['title'], 'Income') |
| data['category'] = classification['category'] |
| |
| if isinstance(data['date'], str): |
| data['date'] = datetime.strptime(data['date'], '%Y-%m-%d') |
| else: |
| data['date'] = datetime.combine(data['date'], datetime.min.time()) |
|
|
| if 'amount' in data: |
| data['amount'] = float(data['amount']) |
|
|
| db = MongoDBClient.get_client() |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$push': {'financial_data.incomes': data}} |
| ) |
| |
| response_data = serializer.data |
| response_data['id'] = str(data['_id']) |
| if not response_data.get('category'): |
| response_data['category'] = data['category'] |
| return Response(response_data, status=status.HTTP_201_CREATED) |
| return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class IncomeDetailView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request, pk): |
| try: |
| db = MongoDBClient.get_client() |
| |
| user = db.users.find_one( |
| {'_id': get_user_db_id(request.user), 'financial_data.incomes._id': ObjectId(pk)}, |
| {'financial_data.incomes.$': 1} |
| ) |
| |
| if not user or 'financial_data' not in user or not user['financial_data']['incomes']: |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| |
| income = user['financial_data']['incomes'][0] |
| income['id'] = str(income['_id']) |
| del income['_id'] |
| if 'date' in income and isinstance(income['date'], datetime): |
| income['date'] = income['date'].date() |
| |
| serializer = IncomeSerializer(income) |
| return Response(serializer.data) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def put(self, request, pk): |
| try: |
| serializer = IncomeSerializer(data=request.data) |
| if serializer.is_valid(): |
| data = serializer.validated_data.copy() |
| data['_id'] = ObjectId(pk) |
| |
| if isinstance(data['date'], str): |
| data['date'] = datetime.strptime(data['date'], '%Y-%m-%d') |
| else: |
| data['date'] = datetime.combine(data['date'], datetime.min.time()) |
|
|
| if 'amount' in data: |
| data['amount'] = float(data['amount']) |
|
|
| db = MongoDBClient.get_client() |
| result = db.users.update_one( |
| {'_id': get_user_db_id(request.user), 'financial_data.incomes._id': ObjectId(pk)}, |
| {'$set': {'financial_data.incomes.$': data}} |
| ) |
| |
| if result.matched_count > 0: |
| response_data = serializer.data |
| response_data['id'] = pk |
| return Response(response_data) |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def delete(self, request, pk): |
| try: |
| db = MongoDBClient.get_client() |
| result = db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$pull': {'financial_data.incomes': {'_id': ObjectId(pk)}}} |
| ) |
| |
| if result.modified_count > 0: |
| return Response(status=status.HTTP_204_NO_CONTENT) |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class ExpenseListCreateView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| db = MongoDBClient.get_client() |
| user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data.expenses': 1}) |
| |
| expenses = [] |
| if user and 'financial_data' in user and 'expenses' in user['financial_data']: |
| expenses = user['financial_data']['expenses'] |
|
|
| |
| for expense in expenses: |
| expense['id'] = str(expense['_id']) |
| if 'date' in expense and isinstance(expense['date'], datetime): |
| expense['date'] = expense['date'].date() |
|
|
| |
| expenses.sort(key=lambda x: (str(x.get('date', '')), str(x.get('id', ''))), reverse=True) |
|
|
| |
| for expense in expenses: |
| if '_id' in expense: del expense['_id'] |
| |
| serializer = ExpenseSerializer(expenses, many=True) |
| return Response(serializer.data) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def post(self, request): |
| try: |
| serializer = ExpenseSerializer(data=request.data) |
| if serializer.is_valid(): |
| data = serializer.validated_data.copy() |
| data['_id'] = ObjectId() |
| data['created_at'] = datetime.now() |
| |
| if not data.get('category'): |
| from finance.category_classifier import classify_transaction |
| prediction = classify_transaction(data['title'], 'Expense') |
| data['category'] = prediction['category'] |
|
|
| if isinstance(data['date'], str): |
| data['date'] = datetime.strptime(data['date'], '%Y-%m-%d') |
| else: |
| data['date'] = datetime.combine(data['date'], datetime.min.time()) |
|
|
| if 'amount' in data: |
| data['amount'] = float(data['amount']) |
|
|
| db = MongoDBClient.get_client() |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$push': {'financial_data.expenses': data}} |
| ) |
| |
| response_data = serializer.data |
| response_data['id'] = str(data['_id']) |
| if not response_data.get('category'): |
| response_data['category'] = data['category'] |
|
|
| return Response(response_data, status=status.HTTP_201_CREATED) |
| return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class ExpenseDetailView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request, pk): |
| try: |
| db = MongoDBClient.get_client() |
| user = db.users.find_one( |
| {'_id': get_user_db_id(request.user), 'financial_data.expenses._id': ObjectId(pk)}, |
| {'financial_data.expenses.$': 1} |
| ) |
| |
| if not user or 'financial_data' not in user or not user['financial_data']['expenses']: |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| |
| expense = user['financial_data']['expenses'][0] |
| expense['id'] = str(expense['_id']) |
| del expense['_id'] |
| if 'date' in expense and isinstance(expense['date'], datetime): |
| expense['date'] = expense['date'].date() |
| |
| serializer = ExpenseSerializer(expense) |
| return Response(serializer.data) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def put(self, request, pk): |
| try: |
| serializer = ExpenseSerializer(data=request.data) |
| if serializer.is_valid(): |
| data = serializer.validated_data.copy() |
| data['_id'] = ObjectId(pk) |
| |
| if not data.get('category'): |
| from finance.category_classifier import classify_transaction |
| prediction = classify_transaction(data['title'], 'Expense') |
| data['category'] = prediction['category'] |
|
|
| if isinstance(data['date'], str): |
| data['date'] = datetime.strptime(data['date'], '%Y-%m-%d') |
| else: |
| data['date'] = datetime.combine(data['date'], datetime.min.time()) |
| |
| if 'amount' in data: |
| data['amount'] = float(data['amount']) |
|
|
| db = MongoDBClient.get_client() |
| result = db.users.update_one( |
| {'_id': get_user_db_id(request.user), 'financial_data.expenses._id': ObjectId(pk)}, |
| {'$set': {'financial_data.expenses.$': data}} |
| ) |
| |
| if result.matched_count > 0: |
| response_data = serializer.data |
| response_data['id'] = pk |
| return Response(response_data) |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def delete(self, request, pk): |
| try: |
| db = MongoDBClient.get_client() |
| result = db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$pull': {'financial_data.expenses': {'_id': ObjectId(pk)}}} |
| ) |
| |
| if result.modified_count > 0: |
| return Response(status=status.HTTP_204_NO_CONTENT) |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class BudgetDetailView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def delete(self, request, pk): |
| try: |
| db = MongoDBClient.get_client() |
| result = db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$pull': {'budgets': {'$or': [{'_id': ObjectId(pk)}, {'id': pk}]}}} |
| ) |
| |
| if result.modified_count > 0: |
| return Response(status=status.HTTP_204_NO_CONTENT) |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class BudgetListCreateView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| db = MongoDBClient.get_client() |
| user = db.users.find_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'budgets': 1, 'financial_data.expenses': 1, 'financial_data.incomes': 1} |
| ) |
| |
| raw_budgets = [] |
| expenses = [] |
| incomes = [] |
| |
| if user: |
| raw_budgets = user.get('budgets', []) |
| if 'financial_data' in user: |
| expenses = user['financial_data'].get('expenses', []) |
| incomes = user['financial_data'].get('incomes', []) |
|
|
| |
| all_txns = [] |
| for e in expenses: |
| e['txn_type'] = 'expense' |
| all_txns.append(e) |
| for i in incomes: |
| i['txn_type'] = 'income' |
| all_txns.append(i) |
|
|
| def get_dt(val): |
| if isinstance(val, datetime): return val |
| if isinstance(val, str): |
| try: return datetime.strptime(val[:10], '%Y-%m-%d') |
| except: return None |
| return None |
|
|
| |
| all_dates = [get_dt(t.get('date')) for t in all_txns] |
| all_dates = [d for d in all_dates if d] |
| available_months = sorted(list(set(d.strftime('%Y-%m') for d in all_dates)), reverse=True) |
|
|
| |
| target_month = request.query_params.get('month') |
| |
| if not target_month: |
| if available_months: |
| target_month = available_months[0] |
| else: |
| target_month = datetime.now().strftime('%Y-%m') |
| |
| month = target_month |
| enriched_budgets = [] |
| |
| |
| if month == 'all': |
| |
| total_months_active = len(available_months) if available_months else 1 |
| |
| |
| latest_budgets_map = {} |
| for b in raw_budgets: |
| |
| cat_norm = str(b.get('category', '')).strip().lower() |
| latest_budgets_map[cat_norm] = b |
| |
| for cat_norm, budget_template in latest_budgets_map.items(): |
| b = budget_template.copy() |
| b['id'] = str(b.get('_id', ObjectId())) |
| if '_id' in b: del b['_id'] |
| |
| |
| lifetime_spent = 0.0 |
| for txn in all_txns: |
| txn_cat = str(txn.get('category', '')).strip().lower() |
| if txn_cat == cat_norm: |
| amt = float(txn.get('amount', 0)) |
| if txn['txn_type'] == 'expense': |
| lifetime_spent += amt |
| else: |
| lifetime_spent -= amt |
| |
| lifetime_spent = max(0, lifetime_spent) |
| |
| |
| |
| |
| if budget_template.get('month') == 'all': |
| limit = b.get('limit_amount', 0) |
| display_spent = lifetime_spent / total_months_active |
| b['label_suffix'] = "(Monthly Avg)" |
| else: |
| limit = sum(lb.get('limit_amount', 0) for lb in raw_budgets |
| if str(lb.get('category', '')).strip().lower() == cat_norm |
| and lb.get('month') != 'all') |
| display_spent = lifetime_spent |
| b['label_suffix'] = "(Lifetime Total)" |
| |
| b['spent_amount'] = round(display_spent, 2) |
| b['limit_amount'] = round(limit, 2) |
| b['month'] = 'All Time' |
| b['remaining_amount'] = round(limit - display_spent, 2) |
| b['is_exceeded'] = display_spent > limit |
| |
| |
| b['lifetime_spent_actual'] = round(lifetime_spent, 2) |
| |
| if b['is_exceeded']: |
| b['alert_message'] = "Exceeding Average" if b.get('label_suffix') == "(Monthly Avg)" else "Lifetime Overspent" |
| else: |
| b['alert_message'] = "Within Average" if b.get('label_suffix') == "(Monthly Avg)" else "Lifetime Safe" |
| |
| enriched_budgets.append(b) |
| |
| else: |
| |
| |
| direct_budgets = [b for b in raw_budgets if b.get('month') == month] |
| direct_cats = {str(b.get('category', '')).strip().lower() for b in direct_budgets} |
| |
| |
| fallback_budgets = [] |
| for b in raw_budgets: |
| if b.get('month') == 'all': |
| cat_norm = str(b.get('category', '')).strip().lower() |
| if cat_norm not in direct_cats: |
| fallback_budgets.append(b) |
| |
| combined_list = direct_budgets + fallback_budgets |
| |
| for budget in combined_list: |
| budget_copy = budget.copy() |
| budget_copy['id'] = str(budget_copy.get('_id', ObjectId())) |
| if '_id' in budget_copy: del budget_copy['_id'] |
| |
| spent = 0.0 |
| limit = budget_copy['limit_amount'] |
| b_cat = str(budget_copy.get('category', '')).strip().lower() |
| |
| for txn in all_txns: |
| txn_date = get_dt(txn.get('date')) |
| if txn_date: |
| txn_month = txn_date.strftime('%Y-%m') |
| txn_cat = str(txn.get('category', '')).strip().lower() |
| |
| if txn_month == month and txn_cat == b_cat: |
| amt = float(txn.get('amount', 0)) |
| if txn['txn_type'] == 'expense': |
| spent += amt |
| else: |
| spent -= amt |
|
|
| budget_copy['spent_amount'] = max(0, spent) |
| budget_copy['remaining_amount'] = limit - budget_copy['spent_amount'] |
| budget_copy['is_exceeded'] = budget_copy['spent_amount'] > limit |
| |
| |
| today = datetime.now() |
| try: |
| start_date = datetime.strptime(month, '%Y-%m') |
| if start_date.year == today.year and start_date.month == today.month: |
| days_passed = max(1, today.day) |
| daily_avg = spent / days_passed |
| projected_total = daily_avg * 30 |
| |
| if budget_copy['is_exceeded']: |
| budget_copy['alert_message'] = "Limit Exceeded!" |
| elif projected_total > limit: |
| budget_copy['alert_message'] = f"Pacing to Exceed (${round(projected_total - limit)})" |
| else: |
| budget_copy['alert_message'] = "On Track" |
| else: |
| budget_copy['alert_message'] = "Closed" if budget['is_exceeded'] else "Safe" |
| except ValueError: |
| budget['alert_message'] = "N/A" |
|
|
| enriched_budgets.append(budget_copy) |
|
|
| |
| category_stats = defaultdict(lambda: {'net_total': 0, 'months': set()}) |
| total_months_active = len(available_months) if available_months else 1 |
| |
| for txn in all_txns: |
| cat = txn.get('category') |
| amt = float(txn.get('amount', 0)) |
| date_val = get_dt(txn.get('date')) |
| |
| if cat and date_val and txn.get('txn_type') == 'expense': |
| category_stats[cat]['net_total'] += amt |
| m_str = date_val.strftime('%Y-%m') |
| category_stats[cat]['months'].add(m_str) |
|
|
| available_categories = [] |
| for cat, stats in category_stats.items(): |
| |
| avg_spend = stats['net_total'] / total_months_active if total_months_active > 0 else 0 |
| |
| available_categories.append({ |
| 'name': cat, |
| 'avg_monthly_spend': round(avg_spend, 2), |
| 'total_spend': round(stats['net_total'], 2) |
| }) |
| |
| available_categories.sort(key=lambda x: x['total_spend'], reverse=True) |
|
|
| serializer = BudgetSerializer(enriched_budgets, many=True) |
| |
| return Response({ |
| "budgets": serializer.data, |
| "categories": available_categories, |
| "current_month": month, |
| "available_months": available_months, |
| "summary": { |
| "total_budgeted": sum(b['limit_amount'] for b in enriched_budgets), |
| "total_spent_in_budgets": sum(b['spent_amount'] for b in enriched_budgets) |
| } |
| }) |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def post(self, request): |
| try: |
| data = request.data |
| category = data.get('category') |
| limit = data.get('limit_amount') |
| month = data.get('month') or datetime.now().strftime('%Y-%m') |
| |
| if not category or not limit: |
| return Response({"error": "Category and Limit Amount required"}, status=status.HTTP_400_BAD_REQUEST) |
|
|
| db = MongoDBClient.get_client() |
| |
| |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$pull': {'budgets': {'category': category, 'month': month}}} |
| ) |
| |
| |
| new_budget = { |
| '_id': ObjectId(), |
| 'category': category, |
| 'limit_amount': float(limit), |
| 'month': month, |
| 'updated_at': datetime.now(), |
| 'user_id': request.user.id |
| } |
| |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$push': {'budgets': new_budget}} |
| ) |
| |
| return Response({"message": "Budget set successfully", "id": str(new_budget['_id'])}, status=status.HTTP_201_CREATED) |
| |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class DashboardSummaryView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| from collections import defaultdict |
| from datetime import timedelta |
| |
| db = MongoDBClient.get_client() |
| user_id = get_user_db_id(request.user) |
| |
| user = None |
| if user_id: |
| user = db.users.find_one({'_id': user_id}, {'financial_data': 1, 'budgets': 1}) |
| |
| raw_incomes = [] |
| raw_expenses = [] |
| budgets = [] |
| |
| if user: |
| if 'financial_data' in user: |
| raw_incomes = user['financial_data'].get('incomes', []) |
| raw_expenses = user['financial_data'].get('expenses', []) |
| budgets = user.get('budgets', []) |
|
|
| |
| def get_dt(val): |
| if isinstance(val, datetime): return val |
| if isinstance(val, str): |
| try: return datetime.fromisoformat(val.replace('Z', '+00:00')) |
| except: |
| try: return datetime.strptime(val[:10], '%Y-%m-%d') |
| except: return None |
| return None |
|
|
| |
| incomes = [] |
| for i in raw_incomes: |
| dt = get_dt(i.get('date')) |
| if dt: |
| i_copy = i.copy() |
| i_copy['date'] = dt |
| incomes.append(i_copy) |
| |
| expenses = [] |
| for e in raw_expenses: |
| dt = get_dt(e.get('date')) |
| if dt: |
| e_copy = e.copy() |
| e_copy['date'] = dt |
| expenses.append(e_copy) |
|
|
| dates = [i['date'] for i in incomes] + [e['date'] for e in expenses] |
| |
| if dates: |
| last_transaction_date = max(dates) |
| anchor_date = last_transaction_date |
| else: |
| anchor_date = datetime.now() |
|
|
| |
| total_income = sum(float(i.get('amount', 0)) for i in incomes) |
| total_expense = sum(float(e.get('amount', 0)) for e in expenses) |
| balance = total_income - total_expense |
|
|
| |
| cat_map = {} |
| for e in expenses: |
| cat = e.get('category', 'Uncategorized') |
| cat_map[cat] = cat_map.get(cat, 0) + float(e.get('amount', 0)) |
| |
| breakdown = [{'name': k, 'value': round(v, 2)} for k, v in cat_map.items()] |
| breakdown.sort(key=lambda x: x['value'], reverse=True) |
|
|
| |
| import calendar |
| |
| |
| if dates: |
| min_date = min(dates) |
| else: |
| min_date = anchor_date - timedelta(days=180) |
|
|
| monthly_data = [] |
| category_trends = [] |
| income_category_trends = [] |
| |
| |
| current_iter_date = min_date.replace(day=1) |
| end_iter_date = anchor_date.replace(day=1) |
| |
| while current_iter_date <= end_iter_date: |
| month_str = current_iter_date.strftime('%Y-%m') |
| month_label = current_iter_date.strftime('%b %Y') |
| |
| m_inc = sum(float(i['amount']) for i in incomes if i['date'].strftime('%Y-%m') == month_str) |
| m_exp = sum(float(e['amount']) for e in expenses if e['date'].strftime('%Y-%m') == month_str) |
| |
| monthly_data.append({ |
| 'month': month_label, |
| 'income': round(m_inc, 2), |
| 'expense': round(m_exp, 2), |
| 'net': round(m_inc - m_exp, 2) |
| }) |
| |
| |
| month_cats_map = defaultdict(float) |
| for e in expenses: |
| if e['date'].strftime('%Y-%m') == month_str: |
| cat_key = str(e.get('category') or 'Uncategorized') |
| month_cats_map[cat_key] += float(e['amount']) |
| |
| cat_trend_entry = {'month': current_iter_date.strftime('%b %Y')} |
| monthly_total = 0 |
| for cat_name, cat_amount in month_cats_map.items(): |
| cat_trend_entry[cat_name] = round(cat_amount, 2) |
| monthly_total += cat_amount |
| cat_trend_entry['total'] = round(monthly_total, 2) |
| |
| category_trends.append(cat_trend_entry) |
| |
| |
| month_income_cats_map = defaultdict(float) |
| for i in incomes: |
| if i['date'].strftime('%Y-%m') == month_str: |
| cat_key = str(i.get('category') or 'Other') |
| month_income_cats_map[cat_key] += float(i['amount']) |
| |
| income_cat_trend_entry = {'month': current_iter_date.strftime('%b %Y')} |
| monthly_income_total = 0 |
| for cat_name, cat_amount in month_income_cats_map.items(): |
| income_cat_trend_entry[cat_name] = round(cat_amount, 2) |
| monthly_income_total += cat_amount |
| income_cat_trend_entry['total'] = round(monthly_income_total, 2) |
| |
| income_category_trends.append(income_cat_trend_entry) |
|
|
| |
| days_in_month = calendar.monthrange(current_iter_date.year, current_iter_date.month)[1] |
| current_iter_date += timedelta(days=days_in_month) |
| current_iter_date = current_iter_date.replace(day=1) |
| |
| |
| budget_progress = [] |
|
|
|
|
| |
| savings_rate = ((total_income - total_expense) / total_income * 100) if total_income > 0 else 0 |
| |
| |
| |
| expense_dates = [e['date'] for e in expenses] |
| spending_anchor = max(expense_dates) if expense_dates else anchor_date |
| |
| |
| spending_anchor = spending_anchor.replace(hour=23, minute=59, second=59) |
| |
| |
| |
| if dates: |
| trend_start_date = min(dates).replace(hour=0, minute=0, second=0) |
| else: |
| trend_start_date = (spending_anchor - timedelta(days=29)).replace(hour=0, minute=0, second=0) |
| |
| total_trend_days = (spending_anchor - trend_start_date).days + 1 |
| if total_trend_days < 30: total_trend_days = 30 |
| |
| |
| daily_spending_map = defaultdict(lambda: defaultdict(float)) |
| for e in expenses: |
| if trend_start_date <= e['date'] <= spending_anchor: |
| d_str = e['date'].strftime('%Y-%m-%d') |
| cat_key = str(e.get('category') or 'Uncategorized') |
| daily_spending_map[d_str][cat_key] += float(e['amount']) |
| |
| daily_spending_trend = [] |
| for i in range(total_trend_days): |
| d = trend_start_date + timedelta(days=i) |
| d_str = d.strftime('%Y-%m-%d') |
| |
| day_data = daily_spending_map.get(d_str, {}) |
| total_day_amount = sum(day_data.values()) |
| |
| entry = { |
| 'date': d.strftime('%b %d'), |
| 'total_amount': round(total_day_amount, 2) |
| } |
| for cat, amt in day_data.items(): |
| entry[cat] = round(amt, 2) |
| |
| daily_spending_trend.append(entry) |
| |
| |
| for i in range(len(daily_spending_trend)): |
| start_window = max(0, i - 29) |
| window = daily_spending_trend[start_window : i + 1] |
| avg = sum(d['total_amount'] for d in window) / len(window) |
| daily_spending_trend[i]['moving_avg_30d'] = round(avg, 2) |
|
|
| |
| if dates: |
| total_days = (max(dates) - min(dates)).days + 1 |
| if total_days < 1: total_days = 1 |
| lifetime_daily_average = total_expense / total_days |
| else: |
| lifetime_daily_average = 0 |
|
|
| |
| avg_daily_spending = lifetime_daily_average |
| |
| |
| daily_income_map = defaultdict(lambda: defaultdict(float)) |
| for i in incomes: |
| if trend_start_date <= i['date'] <= spending_anchor: |
| d_str = i['date'].strftime('%Y-%m-%d') |
| cat_key = str(i.get('category') or 'Other') |
| daily_income_map[d_str][cat_key] += float(i['amount']) |
| |
| daily_income_trend = [] |
| for i in range(total_trend_days): |
| d = trend_start_date + timedelta(days=i) |
| d_str = d.strftime('%Y-%m-%d') |
| |
| day_data = daily_income_map.get(d_str, {}) |
| total_day_amount = sum(day_data.values()) |
| |
| entry = { |
| 'date': d.strftime('%b %d'), |
| 'total_amount': round(total_day_amount, 2) |
| } |
| for cat, amt in day_data.items(): |
| entry[cat] = round(amt, 2) |
| |
| daily_income_trend.append(entry) |
| |
| |
| for i in range(len(daily_income_trend)): |
| start_window = max(0, i - 29) |
| window = daily_income_trend[start_window : i + 1] |
| avg = sum(d['total_amount'] for d in window) / len(window) |
| daily_income_trend[i]['moving_avg_30d'] = round(avg, 2) |
| |
| |
| if dates: |
| total_days_income = (max(dates) - min(dates)).days + 1 |
| if total_days_income < 1: total_days_income = 1 |
| lifetime_daily_income_average = total_income / total_days_income |
| else: |
| lifetime_daily_income_average = 0 |
| |
| avg_daily_income = lifetime_daily_income_average |
|
|
|
|
|
|
| |
| |
| expense_ratio = (total_expense / total_income * 100) if total_income > 0 else 0 |
| if total_income == 0 and total_expense > 0: expense_ratio = 100 |
| |
| |
| income_sources = defaultdict(float) |
| for inc in incomes: |
| income_sources[inc.get('category', 'Other')] += float(inc.get('amount', 0)) |
| income_breakdown = [{'name': k, 'value': round(v, 2)} for k, v in income_sources.items()] |
| income_breakdown.sort(key=lambda x: x['value'], reverse=True) |
| |
| |
| |
| |
| |
| adherence_score = max(0, 100 - expense_ratio) |
| balance_status = 100 if balance > 0 else 0 |
| health_score = round((savings_rate * 0.4) + (adherence_score * 0.4) + (balance_status * 0.2)) |
| |
| |
| data_days = 0 |
| if dates: |
| data_days = (max(dates) - min(dates)).days |
| |
| if data_days > 180: |
| forecast_accuracy = 98 |
| elif data_days > 90: |
| forecast_accuracy = 95 |
| elif data_days > 30: |
| forecast_accuracy = 91 |
| elif data_days > 7: |
| forecast_accuracy = 86 |
| else: |
| forecast_accuracy = 0 |
|
|
| |
| anchor_month_str = anchor_date.strftime('%Y-%m') |
| monthly_txn_count = len([t for t in incomes if t['date'].strftime('%Y-%m') == anchor_month_str]) + \ |
| len([t for t in expenses if t['date'].strftime('%Y-%m') == anchor_month_str]) |
|
|
| |
| |
| |
| three_months_ago = anchor_date - timedelta(days=90) |
| last_3m_expenses = [float(e['amount']) for e in expenses if e['date'] >= three_months_ago] |
| burn_rate = sum(last_3m_expenses) / 3 if last_3m_expenses else (total_expense / (total_trend_days / 30) if total_trend_days > 0 else 0) |
| |
| |
| daily_burn = burn_rate / 30 |
| runway_days = round(balance / daily_burn) if balance > 0 and daily_burn > 0 else 0 |
| runway_months = round(balance / burn_rate, 1) if balance > 0 and burn_rate > 0 else 0 |
| |
| |
| daily_sums = [d['total_amount'] for d in daily_spending_trend[-30:]] |
| if len(daily_sums) > 1: |
| mean = sum(daily_sums) / len(daily_sums) |
| variance = sum((x - mean) ** 2 for x in daily_sums) / len(daily_sums) |
| volatility_score = round((variance ** 0.5) / (mean if mean > 0 else 1) * 100) |
| else: |
| volatility_score = 0 |
| |
| |
| from collections import Counter |
| titles = [e.get('title', '').strip().lower() for e in expenses] |
| title_counts = Counter(titles) |
| |
| recurring_titles = [t for t, count in title_counts.items() if count >= 3 and len(t) > 2] |
| recurring_count = len(recurring_titles) |
|
|
| |
| |
| curr_month = anchor_date.strftime('%Y-%m') |
| curr_exp = sum(float(e['amount']) for e in expenses if e['date'].strftime('%Y-%m') == curr_month) |
| curr_inc = sum(float(i['amount']) for i in incomes if i['date'].strftime('%Y-%m') == curr_month) |
| curr_savings_rate = ((curr_inc - curr_exp) / curr_inc * 100) if curr_inc > 0 else 0 |
| |
| |
| first_of_curr = anchor_date.replace(day=1) |
| last_of_prev = first_of_curr - timedelta(days=1) |
| prev_month = last_of_prev.strftime('%Y-%m') |
| prev_exp = sum(float(e['amount']) for e in expenses if e['date'].strftime('%Y-%m') == prev_month) |
| prev_inc = sum(float(i['amount']) for i in incomes if i['date'].strftime('%Y-%m') == prev_month) |
| prev_savings_rate = ((prev_inc - prev_exp) / prev_inc * 100) if prev_inc > 0 else 0 |
| |
| spending_momentum = round(((curr_exp - prev_exp) / prev_exp * 100), 1) if prev_exp > 0 else 0 |
| income_momentum = round(((curr_inc - prev_inc) / prev_inc * 100), 1) if prev_inc > 0 else 0 |
| savings_momentum = round(curr_savings_rate - prev_savings_rate, 1) |
| |
| |
| curr_cat_exp = defaultdict(float) |
| for e in expenses: |
| if e['date'].strftime('%Y-%m') == curr_month: |
| curr_cat_exp[e.get('category', 'Other')] += float(e['amount']) |
| |
| prev_cat_exp = defaultdict(float) |
| for e in expenses: |
| if e['date'].strftime('%Y-%m') == prev_month: |
| prev_cat_exp[e.get('category', 'Other')] += float(e['amount']) |
| |
| spiking_cat = "N/A" |
| max_spike = -1.0 |
| for cat, amt in curr_cat_exp.items(): |
| spike = amt - prev_cat_exp.get(cat, 0) |
| if spike > max_spike: |
| max_spike = spike |
| spiking_cat = cat |
| |
| |
| import calendar |
| days_passed = anchor_date.day |
| days_in_month = calendar.monthrange(anchor_date.year, anchor_date.month)[1] |
| projected_spending = round((curr_exp / days_passed * days_in_month), 2) if days_passed > 0 else 0 |
|
|
| |
| |
| all_tx = [] |
| for i in incomes: |
| all_tx.append({ |
| 'id': str(i.get('_id')), |
| 'type': 'income', |
| 'title': i.get('title'), |
| 'amount': i.get('amount'), |
| 'category': i.get('category', 'Income'), |
| 'date': i.get('date') |
| }) |
| for e in expenses: |
| all_tx.append({ |
| 'id': str(e.get('_id')), |
| 'type': 'expense', |
| 'title': e.get('title'), |
| 'amount': e.get('amount'), |
| 'category': e.get('category', 'Expense'), |
| 'date': e.get('date') |
| }) |
| |
| |
| all_tx.sort(key=lambda x: (x['date'] if x['date'] else datetime.min, x.get('id', '')), reverse=True) |
| |
| |
| recent_transactions = [] |
| for t in all_tx[:50]: |
| t_copy = t.copy() |
| if isinstance(t_copy['date'], datetime): |
| t_copy['date'] = t_copy['date'].strftime('%Y-%m-%d') |
| recent_transactions.append(t_copy) |
|
|
| return Response({ |
| 'total_income': round(total_income, 2), |
| 'total_expense': round(total_expense, 2), |
| 'balance': round(balance, 2), |
| 'expense_breakdown': breakdown, |
| 'monthly_trends': monthly_data, |
| 'category_trends': category_trends, |
| 'income_category_trends': income_category_trends, |
| 'budget_progress': budget_progress, |
| 'savings_rate': round(savings_rate, 1), |
| 'avg_daily_spending': round(avg_daily_spending, 2), |
| 'avg_daily_income': round(avg_daily_income, 2), |
| 'expense_control': round(expense_ratio, 1), |
| 'health_score': health_score, |
| 'forecast_accuracy': forecast_accuracy, |
| 'monthly_txn_count': monthly_txn_count, |
| 'burn_rate': round(burn_rate, 2), |
| 'runway_days': runway_days, |
| 'runway_months': runway_months, |
| 'volatility_score': volatility_score, |
| 'recurring_count': recurring_count, |
| 'momentum': { |
| 'spending': spending_momentum, |
| 'income': income_momentum, |
| 'savings': savings_momentum, |
| 'spiking_category': spiking_cat, |
| 'projected_spending': projected_spending |
| }, |
| 'income_sources': income_breakdown, |
| 'recent_transactions': recent_transactions, |
| |
| |
| 'daily_spending_trend': daily_spending_trend, |
| 'lifetime_daily_average': round(lifetime_daily_average, 2), |
| |
| |
| 'daily_income_trend': daily_income_trend, |
| 'lifetime_daily_income_average': round(lifetime_daily_income_average, 2), |
|
|
| 'last_updated': datetime.now().isoformat(), |
| 'data_anchor_date': anchor_date.strftime('%Y-%m-%d') |
| }) |
|
|
|
|
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class BulkTransactionDeleteView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def delete(self, request): |
| try: |
| db = MongoDBClient.get_client() |
| |
| |
| category = request.query_params.get('category') |
| type_filter = request.query_params.get('type') |
| search = request.query_params.get('search') |
| |
| |
| if not any([category, type_filter, search]): |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$set': {'financial_data.incomes': [], 'financial_data.expenses': []}} |
| ) |
| return Response({"message": "All transactions deleted successfully"}, status=status.HTTP_200_OK) |
| |
| |
| pull_query = {} |
| if category: |
| pull_query['category'] = {'$regex': category, '$options': 'i'} |
| if type_filter and type_filter != 'all': |
| |
| pass |
| if search: |
| |
| pull_query['title'] = {'$regex': search, '$options': 'i'} |
|
|
| update_ops = {} |
| if type_filter == 'income' or type_filter == 'all' or not type_filter: |
| update_ops['financial_data.incomes'] = pull_query |
| if type_filter == 'expense' or type_filter == 'all' or not type_filter: |
| update_ops['financial_data.expenses'] = pull_query |
|
|
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$pull': update_ops} |
| ) |
| |
| return Response({"message": "Matching transactions deleted successfully"}, status=status.HTTP_200_OK) |
| |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class TransactionOCRView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
| |
| def post(self, request): |
| print("DEBUG: Scan endpoint called") |
| try: |
| if 'file' not in request.FILES: |
| print("DEBUG: No file in request") |
| return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST) |
| |
| image_file = request.FILES['file'] |
| file_size = image_file.size / 1024 |
| print(f"DEBUG: File received: {image_file.name}, Size: {file_size:.2f} KB, ContentType: {image_file.content_type}") |
| |
| |
| print("DEBUG: Calling scan_receipt_with_llm...") |
| data = scan_receipt_with_llm(image_file) |
| print(f"DEBUG: AI OCR raw data: {data}") |
|
|
| |
| if "error" in data: |
| return Response({'error': data['error']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| saved_transactions = [] |
| errors = [] |
| db = MongoDBClient.get_client() |
| print("DEBUG: MongoDB client obtained") |
| |
| |
| def save_transaction(title, amount, date, category_override=None): |
| from finance.category_classifier import classify_transaction |
| |
| |
| classification = classify_transaction(title, 'Expense') |
| |
| cat = category_override if category_override else classification['category'] |
| type_ = classification['type'] |
| |
| |
| tx_date = datetime.now() |
| if date: |
| try: |
| if isinstance(date, str): |
| tx_date = datetime.strptime(date, '%Y-%m-%d') |
| elif isinstance(date, datetime): |
| tx_date = date |
| except: pass |
| |
| new_tx = { |
| '_id': ObjectId(), |
| 'title': title, |
| 'amount': float(amount), |
| 'category': cat, |
| 'date': tx_date, |
| 'type': type_, |
| 'created_at': datetime.now(), |
| 'source': 'ocr_scan' |
| } |
| |
| collection = 'financial_data.incomes' if type_ == 'Income' else 'financial_data.expenses' |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$push': {collection: new_tx}} |
| ) |
| |
| |
| new_tx['id'] = str(new_tx['_id']) |
| del new_tx['_id'] |
| return new_tx |
|
|
| |
| if data.get('items'): |
| print(f"DEBUG: Found {len(data['items'])} items") |
| for item in data['items']: |
| try: |
| |
| if not item.get('title') or not item.get('amount'): |
| continue |
| |
| |
| title = str(item['title']).strip() |
| if len(title) < 2: continue |
| |
| saved_tx = save_transaction(title, item['amount'], data.get('date')) |
| saved_transactions.append(saved_tx) |
| except Exception as item_err: |
| import traceback |
| print(f"Failed to save item {item.get('title')}: {item_err}") |
| print(f"Traceback: {traceback.format_exc()}") |
| errors.append(f"{item.get('title')}: {str(item_err)}") |
| |
| |
| elif data.get('total') and data['total'] > 0: |
| print(f"DEBUG: No items, using total: {data['total']}") |
| try: |
| title = data.get('merchant') or 'Scanned Receipt' |
| saved_tx = save_transaction(title, data['total'], data.get('date')) |
| saved_transactions.append(saved_tx) |
| except Exception as e: |
| errors.append(f"Total fallback failed: {str(e)}") |
| else: |
| print("DEBUG: No items and no total found.") |
| |
| |
| msg = f"Successfully added {len(saved_transactions)} transactions." |
| if errors: |
| msg += f" \n({len(errors)} failed: {', '.join(errors)})" |
| |
| |
| status_code = status.HTTP_200_OK |
| if not saved_transactions and not errors: |
| msg = "No transactions found in receipt. Please try a clearer image." |
| |
| |
| |
| elif not saved_transactions and errors: |
| status_code = status.HTTP_500_INTERNAL_SERVER_ERROR |
| msg = "Scan failed: " + "; ".join(errors) |
|
|
| return Response({'message': msg, 'transactions': saved_transactions}, status=status_code) |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| |
| return Response({ |
| "message": msg, |
| "data": saved_transactions, |
| "raw_ocr": data |
| }, status=status_code) |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class VoiceCommandView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def post(self, request): |
| try: |
| |
| text = request.data.get('text') |
| |
| |
| if not text: |
| return Response({'error': 'No text provided. Please use the microphone button.'}, status=status.HTTP_400_BAD_REQUEST) |
|
|
|
|
| |
| from .ai_helper import CommandParser |
| parsed = CommandParser.parse(text) |
| print(f"DEBUG: Parsed command: {parsed}") |
| |
| |
| saved_transaction = None |
| if parsed['amount'] and parsed['merchant']: |
| db = MongoDBClient.get_client() |
| |
| new_tx = { |
| '_id': ObjectId(), |
| 'title': parsed['merchant'], |
| 'amount': float(parsed['amount']), |
| 'category': parsed['category'] or 'Uncategorized', |
| 'date': datetime.combine(datetime.now().date(), datetime.min.time()), |
| 'type': parsed['type'], |
| 'created_at': datetime.now(), |
| 'source': 'voice_command', |
| 'voice_text': text |
| } |
| |
| collection = 'financial_data.incomes' if parsed['type'] == 'Income' else 'financial_data.expenses' |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$push': {collection: new_tx}} |
| ) |
| |
| saved_transaction = new_tx.copy() |
| saved_transaction['id'] = str(saved_transaction['_id']) |
| del saved_transaction['_id'] |
|
|
| return Response({ |
| "text": text, |
| "parsed": parsed, |
| "transaction": saved_transaction, |
| "message": "Transaction added" if saved_transaction else "Could not auto-add (missing details)" |
| }) |
|
|
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class TransactionListView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| page = int(request.query_params.get('page', 1)) |
| limit = int(request.query_params.get('limit', 20)) |
| search = request.query_params.get('search', '').lower() |
| t_type = request.query_params.get('type', 'all').lower() |
| category = request.query_params.get('category', '').lower() |
| |
| |
| date_mode = request.query_params.get('date_mode', 'all') |
| f_date = request.query_params.get('date', '') |
| f_month = request.query_params.get('month', '') |
| f_year = request.query_params.get('year', '') |
| s_date = request.query_params.get('start_date', '') |
| e_date = request.query_params.get('end_date', '') |
| |
| db = MongoDBClient.get_client() |
| user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data': 1}) |
| |
| incomes = [] |
| expenses = [] |
| |
| if user and 'financial_data' in user: |
| incomes = user['financial_data'].get('incomes', []) |
| expenses = user['financial_data'].get('expenses', []) |
|
|
| |
| all_transactions = [] |
| for i in incomes: |
| i_copy = i.copy() |
| i_copy['id'] = str(i_copy.get('_id')) |
| if '_id' in i_copy: del i_copy['_id'] |
| i_copy['type'] = 'income' |
| if 'date' in i_copy and isinstance(i_copy['date'], datetime): |
| i_copy['date'] = i_copy['date'].strftime('%Y-%m-%d') |
| all_transactions.append(i_copy) |
|
|
| for e in expenses: |
| e_copy = e.copy() |
| e_copy['id'] = str(e_copy.get('_id')) |
| if '_id' in e_copy: del e_copy['_id'] |
| e_copy['type'] = 'expense' |
| if 'date' in e_copy and isinstance(e_copy['date'], datetime): |
| e_copy['date'] = e_copy['date'].strftime('%Y-%m-%d') |
| all_transactions.append(e_copy) |
|
|
| |
| if t_type == 'income': |
| all_transactions = [t for t in all_transactions if t['type'] == 'income'] |
| elif t_type == 'expense': |
| all_transactions = [t for t in all_transactions if t['type'] == 'expense'] |
| |
| if search: |
| all_transactions = [t for t in all_transactions if search in t.get('title', '').lower()] |
| |
| if category: |
| all_transactions = [t for t in all_transactions if category in t.get('category', '').lower()] |
|
|
| if date_mode == 'specific' and f_date: |
| all_transactions = [t for t in all_transactions if t.get('date') == f_date] |
| elif date_mode == 'month' and f_month: |
| all_transactions = [t for t in all_transactions if t.get('date', '').startswith(f_month)] |
| elif date_mode == 'year' and f_year: |
| all_transactions = [t for t in all_transactions if t.get('date', '').startswith(f_year)] |
| elif date_mode == 'range' and s_date and e_date: |
| all_transactions = [t for t in all_transactions if s_date <= t.get('date', '') <= e_date] |
|
|
| |
| all_transactions.sort(key=lambda x: (str(x.get('date', '')), str(x.get('id', ''))), reverse=True) |
|
|
| |
| import math |
| total_items = len(all_transactions) |
| total_pages = math.ceil(total_items / limit) |
| start = (page - 1) * limit |
| end = start + limit |
| |
| paginated_data = all_transactions[start:end] |
| |
| return Response({ |
| 'results': paginated_data, |
| 'total_pages': total_pages, |
| 'current_page': page, |
| 'total_items': total_items |
| }) |
|
|
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class TransactionCategoryView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| db = MongoDBClient.get_client() |
| user = db.users.find_one({'_id': get_user_db_id(request.user)}, {'financial_data': 1}) |
| |
| income_cats = { |
| "Salary", "Freelance & Gigs", "Business Revenue", "Investments & Dividends", |
| "Rental Income", "Refunds & Cashbacks", "Gifts & Bonuses", |
| "Grants & Scholarships", "Pensions & Social Security", "Other Income" |
| } |
| expense_cats = { |
| "Housing", "Food & Dining", "Transportation", "Shopping", "Healthcare", |
| "Insurance", "Entertainment", "Subscriptions", "Bills & Utilities", |
| "Education", "Personal Care", "Travel", "Pets", "Family & Kids", |
| "Gifts & Donations", "Taxes", "Professional Services", |
| "Investments & Savings", "Other Expenses" |
| } |
| |
| if user and 'financial_data' in user: |
| incomes = user['financial_data'].get('incomes', []) |
| expenses = user['financial_data'].get('expenses', []) |
| |
| for i in incomes: |
| if i.get('category'): |
| income_cats.add(i['category']) |
| |
| for e in expenses: |
| if e.get('category'): |
| expense_cats.add(e['category']) |
| |
| return Response({ |
| "income": sorted(list(income_cats)), |
| "expense": sorted(list(expense_cats)) |
| }) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class APIStatusView(APIView): |
| """ |
| Diagnostic endpoint to check if AI keys are detected and clients are initialized. |
| Used for troubleshooting "API Not Available" on Render production. |
| """ |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| status_data = { |
| "environment": os.getenv('ENVIRONMENT', 'unknown'), |
| "gemini": { |
| "key_present": bool(GEMINI_API_KEY), |
| "key_length": len(GEMINI_API_KEY) if GEMINI_API_KEY else 0, |
| "client_initialized": bool(CLIENT), |
| "regional_blocked": GEMINI_REGIONAL_BLOCKED, |
| "init_error": INIT_ERRORS.get("gemini") |
| }, |
| "system": { |
| "time": datetime.now().isoformat(), |
| "cwd": os.getcwd() |
| } |
| } |
| return Response(status_data) |
|
|
| class SavingsGoalListCreateView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| db = MongoDBClient.get_client() |
| user_id = get_user_db_id(request.user) |
| user = db.users.find_one({'_id': user_id}, {'financial_data': 1}) |
| |
| |
| total_income = 0 |
| total_expense = 0 |
| if user and 'financial_data' in user: |
| total_income = sum(i.get('amount', 0) for i in user['financial_data'].get('incomes', [])) |
| total_expense = sum(e.get('amount', 0) for e in user['financial_data'].get('expenses', [])) |
| |
| net_balance = total_income - total_expense |
| |
| goals = [] |
| if user and 'financial_data' in user and 'savings_goals' in user['financial_data']: |
| goals = user['financial_data']['savings_goals'] |
|
|
| for goal in goals: |
| goal['id'] = str(goal['_id']) |
| if 'target_date' in goal and isinstance(goal['target_date'], datetime): |
| goal['target_date'] = goal['target_date'].date() |
| |
| |
| goal['current_amount'] = net_balance |
| |
| |
| goal['progress_percentage'] = min(100, (goal['current_amount'] / goal['target_amount'] * 100)) if goal['target_amount'] > 0 else 0 |
| goal['is_completed'] = goal['current_amount'] >= goal['target_amount'] |
|
|
| |
| for goal in goals: |
| if '_id' in goal: del goal['_id'] |
| |
| serializer = SavingsGoalSerializer(goals, many=True) |
| return Response(serializer.data) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def post(self, request): |
| try: |
| serializer = SavingsGoalSerializer(data=request.data) |
| if serializer.is_valid(): |
| data = serializer.validated_data.copy() |
| data['_id'] = ObjectId() |
| data['created_at'] = datetime.now() |
| |
| if data.get('target_date'): |
| if isinstance(data['target_date'], str): |
| data['target_date'] = datetime.strptime(data['target_date'], '%Y-%m-%d') |
| else: |
| data['target_date'] = datetime.combine(data['target_date'], datetime.min.time()) |
|
|
| db = MongoDBClient.get_client() |
| db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$push': {'financial_data.savings_goals': data}} |
| ) |
| |
| response_data = serializer.data |
| response_data['id'] = str(data['_id']) |
| return Response(response_data, status=status.HTTP_201_CREATED) |
| return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class SavingsGoalDetailView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request, pk): |
| try: |
| db = MongoDBClient.get_client() |
| user_id = get_user_db_id(request.user) |
| user = db.users.find_one({'_id': user_id}, {'financial_data': 1}) |
| |
| |
| goal_list = user.get('financial_data', {}).get('savings_goals', []) |
| goal = next((g for g in goal_list if str(g['_id']) == pk), None) |
| |
| if not goal: |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| |
| |
| total_income = sum(i.get('amount', 0) for i in user['financial_data'].get('incomes', [])) |
| total_expense = sum(e.get('amount', 0) for e in user['financial_data'].get('expenses', [])) |
| net_balance = total_income - total_expense |
| |
| goal['id'] = str(goal['_id']) |
| del goal['_id'] |
| if 'target_date' in goal and isinstance(goal['target_date'], datetime): |
| goal['target_date'] = goal['target_date'].date() |
| |
| |
| goal['current_amount'] = net_balance |
| goal['progress_percentage'] = min(100, (goal['current_amount'] / goal['target_amount'] * 100)) if goal['target_amount'] > 0 else 0 |
| goal['is_completed'] = goal['current_amount'] >= goal['target_amount'] |
| |
| serializer = SavingsGoalSerializer(goal) |
| return Response(serializer.data) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def put(self, request, pk): |
| try: |
| serializer = SavingsGoalSerializer(data=request.data, partial=True) |
| if serializer.is_valid(): |
| update_data = {} |
| for key, value in serializer.validated_data.items(): |
| if key == 'target_date' and value: |
| if isinstance(value, str): |
| value = datetime.strptime(value, '%Y-%m-%d') |
| else: |
| value = datetime.combine(value, datetime.min.time()) |
| update_data[f'financial_data.savings_goals.$.{key}'] = value |
|
|
| db = MongoDBClient.get_client() |
| result = db.users.update_one( |
| {'_id': get_user_db_id(request.user), 'financial_data.savings_goals._id': ObjectId(pk)}, |
| {'$set': update_data} |
| ) |
| |
| if result.matched_count == 0: |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| |
| return Response(serializer.data) |
| return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def delete(self, request, pk): |
| try: |
| db = MongoDBClient.get_client() |
| result = db.users.update_one( |
| {'_id': get_user_db_id(request.user)}, |
| {'$pull': {'financial_data.savings_goals': {'_id': ObjectId(pk)}}} |
| ) |
| |
| if result.modified_count == 0: |
| return Response(status=status.HTTP_404_NOT_FOUND) |
| |
| return Response(status=status.HTTP_204_NO_CONTENT) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| class ComprehensiveReportView(APIView): |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def get(self, request): |
| try: |
| db = MongoDBClient.get_client() |
| user_id = get_user_db_id(request.user) |
| user = db.users.find_one({'_id': user_id}, {'financial_data': 1, 'budgets': 1}) |
| |
| raw_incomes = user.get('financial_data', {}).get('incomes', []) if user else [] |
| raw_expenses = user.get('financial_data', {}).get('expenses', []) if user else [] |
|
|
| def get_dt(val): |
| if isinstance(val, datetime): return val |
| if isinstance(val, str): |
| try: return datetime.fromisoformat(val.replace('Z', '+00:00')) |
| except: |
| try: return datetime.strptime(val[:10], '%Y-%m-%d') |
| except: return None |
| return None |
|
|
| incomes = [] |
| for i in raw_incomes: |
| dt = get_dt(i.get('date')) |
| if dt: |
| i_copy = i.copy() |
| i_copy['date'] = dt |
| incomes.append(i_copy) |
| |
| expenses = [] |
| for e in raw_expenses: |
| dt = get_dt(e.get('date')) |
| if dt: |
| e_copy = e.copy() |
| e_copy['date'] = dt |
| expenses.append(e_copy) |
|
|
| |
| from .analytics import FinancialAnalytics |
| analytics = FinancialAnalytics(incomes, expenses) |
| |
| |
| executive = analytics.calculate_executive_overview() |
| |
| |
| income_analysis = analytics.analyze_income_structure() |
| |
| |
| expense_analysis = analytics.analyze_expense_structure() |
| |
| |
| all_txns = sorted(incomes + expenses, key=lambda x: x['date']) |
| rolling_balance = [] |
| running_total = 0 |
| for t in all_txns: |
| amt = float(t.get('amount', 0)) |
| if t in incomes: running_total += amt |
| else: running_total -= amt |
| rolling_balance.append({ |
| 'date': t['date'].strftime('%Y-%m-%d'), |
| 'balance': round(running_total, 2) |
| }) |
| |
| |
| health_score = analytics.calculate_financial_health_score() |
|
|
| |
| behavioral_insight = ( |
| f"Your financial stability score is {round(executive['stability_score'])}/100. " |
| f"You save {round(executive['savings_rate'], 1)}% of your income. " |
| f"Income concentration is {round(income_analysis['concentration_score'], 1)} (Lower is better/more diverse). " |
| f"Fixed expenses consume {round(expense_analysis['fixed_ratio'], 1)}% of your budget." |
| ) |
|
|
| |
| response_data = { |
| "metric_date": datetime.now().isoformat(), |
| "executive_overview": executive, |
| "income_analysis": income_analysis, |
| "expense_analysis": expense_analysis, |
| "temporal_patterns": analytics.analyze_temporal_patterns(), |
| "recurring_commitments": analytics.detect_recurring_payments(), |
| "capex_log": analytics.analyze_capex(), |
| "tax_liability": analytics.estimate_tax_liability(), |
| "solvency_runway": analytics.calculate_runway(), |
| "monthly_trends": analytics.analyze_monthly_trends(), |
| "category_details": analytics.analyze_category_details(), |
| "recommendations": analytics.generate_recommendations(), |
| "velocity": analytics.calculate_financial_velocity(), |
| "recent_activity": analytics.get_recent_transactions(), |
| "cash_flow": { |
| "rolling_balance": rolling_balance[-30:] if len(rolling_balance) > 30 else rolling_balance, |
| "liquidity_ratio": "High" if running_total > executive['burn_rate'] * 3 else "Moderate" if running_total > executive['burn_rate'] else "Low" |
| }, |
| "savings_wealth": { |
| "current_net_worth": round(running_total, 2), |
| "savings_rate": round(executive['savings_rate'], 2), |
| "wealth_accumulation_score": min(100, (executive['savings_rate'] * 2)) |
| }, |
| "risk_assessment": { |
| "expense_dependency": round(expense_analysis['fixed_ratio'], 2), |
| "income_stability": round(income_analysis['consistency_score'], 2), |
| "risk_level": "Low" if executive['stability_score'] > 70 else "Medium" |
| }, |
| "behavioral_analysis": { |
| "insight_text": behavioral_insight |
| }, |
| "financial_efficiency": { |
| "efficiency_score": round(health_score * 0.8, 1) |
| }, |
| "comparative_performance": { |
| "vs_last_month": "+5%" |
| }, |
| "health_scorecard": { |
| "overall_score": round(health_score, 0), |
| "metrics": { |
| "savings": min(100, executive['savings_rate'] * 2), |
| "stability": executive['stability_score'], |
| "diversity": 100 - income_analysis['concentration_score'], |
| "solvency": 100 if executive['net_cash_flow'] > 0 else 50 |
| } |
| } |
| } |
|
|
| return Response(response_data) |
|
|
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
|
|
| class LoadSampleDataView(APIView): |
| """ |
| GET /api/finance/sample-data/preview/ → stats about the shared `sample` collection |
| POST /api/finance/sample-data/load/ → copies sample docs into the current user account |
| |
| The `sample` collection already exists in the finance MongoDB database. |
| No seeding or management commands are needed. |
| """ |
| permission_classes = [permissions.IsAuthenticated] |
|
|
| def _get_sample_docs(self): |
| db = MongoDBClient.get_client() |
| return list(db.sample.find({}, {'_id': 0})) |
|
|
| def get(self, request): |
| """Return preview statistics for the sample dataset.""" |
| try: |
| docs = self._get_sample_docs() |
| if not docs: |
| return Response({"error": "Sample collection is empty or not found."}, status=status.HTTP_404_NOT_FOUND) |
|
|
| income_docs = [d for d in docs if str(d.get('type', '')).lower() == 'income'] |
| expense_docs = [d for d in docs if str(d.get('type', '')).lower() != 'income'] |
|
|
| dates = [] |
| for d in docs: |
| raw = d.get('date') |
| if isinstance(raw, datetime): |
| dates.append(raw) |
| elif isinstance(raw, str): |
| try: |
| dates.append(datetime.strptime(raw[:10], '%Y-%m-%d')) |
| except Exception: |
| pass |
|
|
| categories = sorted(list({d.get('category', 'Uncategorized') for d in docs})) |
|
|
| return Response({ |
| "total": len(docs), |
| "income_count": len(income_docs), |
| "expense_count": len(expense_docs), |
| "categories": categories, |
| "date_start": min(dates).strftime('%Y-%m-%d') if dates else None, |
| "date_end": max(dates).strftime('%Y-%m-%d') if dates else None, |
| }) |
| except Exception as e: |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|
| def post(self, request): |
| """Load sample transactions into the authenticated user's account.""" |
| try: |
| docs = self._get_sample_docs() |
| if not docs: |
| return Response({"error": "Sample collection is empty."}, status=status.HTTP_404_NOT_FOUND) |
|
|
| db = MongoDBClient.get_client() |
| user_id = get_user_db_id(request.user) |
| now = datetime.now() |
|
|
| income_docs = [] |
| expense_docs = [] |
|
|
| for d in docs: |
| raw_date = d.get('date') |
| if isinstance(raw_date, datetime): |
| date_obj = raw_date |
| elif isinstance(raw_date, str): |
| try: |
| date_obj = datetime.strptime(raw_date[:10], '%Y-%m-%d') |
| except Exception: |
| date_obj = now |
| else: |
| date_obj = now |
|
|
| doc = { |
| '_id': ObjectId(), |
| 'title': d.get('title', 'Sample Transaction'), |
| 'amount': float(d.get('amount', 0)), |
| 'category': d.get('category', 'Uncategorized'), |
| 'date': date_obj, |
| 'created_at': now, |
| 'source': 'sample', |
| } |
|
|
| if str(d.get('type', '')).lower() == 'income': |
| income_docs.append(doc) |
| else: |
| expense_docs.append(doc) |
|
|
| if income_docs: |
| db.users.update_one( |
| {'_id': user_id}, |
| {'$push': {'financial_data.incomes': {'$each': income_docs}}} |
| ) |
| if expense_docs: |
| db.users.update_one( |
| {'_id': user_id}, |
| {'$push': {'financial_data.expenses': {'$each': expense_docs}}} |
| ) |
|
|
| total = len(income_docs) + len(expense_docs) |
| return Response({ |
| "message": f"Successfully loaded {total} sample transactions ({len(income_docs)} income, {len(expense_docs)} expense) into your account.", |
| "count": total, |
| "income_count": len(income_docs), |
| "expense_count": len(expense_docs), |
| }, status=status.HTTP_201_CREATED) |
|
|
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
|
|