| import numpy as np |
| from datetime import datetime, timedelta |
| from collections import defaultdict |
|
|
| class FinancialAnalytics: |
| def __init__(self, incomes, expenses): |
| self.incomes = sorted(incomes, key=lambda x: x['date']) |
| self.expenses = sorted(expenses, key=lambda x: x['date']) |
| self.all_txns = sorted(incomes + expenses, key=lambda x: x['date']) |
| |
| |
| self.total_income = sum(float(i.get('amount', 0)) for i in self.incomes) |
| self.total_expense = sum(float(e.get('amount', 0)) for e in self.expenses) |
| self.net_savings = self.total_income - self.total_expense |
| |
| def _get_monthly_aggregates(self, txns): |
| monthly_map = defaultdict(float) |
| for t in txns: |
| m = t['date'].strftime('%Y-%m') |
| monthly_map[m] += float(t.get('amount', 0)) |
| return monthly_map |
|
|
| def calculate_executive_overview(self): |
| savings_rate = (self.net_savings / self.total_income * 100) if self.total_income > 0 else 0 |
| |
| |
| monthly_expenses = self._get_monthly_aggregates(self.expenses) |
| burn_rate = np.mean(list(monthly_expenses.values())) if monthly_expenses else 0 |
| |
| |
| if monthly_expenses: |
| std_dev = np.std(list(monthly_expenses.values())) |
| mean_exp = np.mean(list(monthly_expenses.values())) |
| |
| |
| cv = std_dev / mean_exp if mean_exp > 0 else 0 |
| stability_score = max(0, min(100, 100 - (cv * 100))) |
| else: |
| stability_score = 0 |
| |
| return { |
| "total_income": self.total_income, |
| "total_expenses": self.total_expense, |
| "net_cash_flow": self.net_savings, |
| "savings_rate": savings_rate, |
| "burn_rate": burn_rate, |
| "stability_score": stability_score, |
| "status": "Surplus" if self.net_savings >= 0 else "Deficit" |
| } |
|
|
| def analyze_income_structure(self): |
| sources = defaultdict(float) |
| for i in self.incomes: |
| sources[i.get('category', 'Uncategorized')] += float(i.get('amount', 0)) |
| |
| |
| total_sq = sum(v**2 for v in sources.values()) |
| hhi = (total_sq / (self.total_income**2)) if self.total_income > 0 else 0 |
| |
| monthly_incomes = self._get_monthly_aggregates(self.incomes) |
| consistency = 0 |
| if monthly_incomes: |
| std_dev = np.std(list(monthly_incomes.values())) |
| mean_inc = np.mean(list(monthly_incomes.values())) |
| cv = std_dev / mean_inc if mean_inc > 0 else 0 |
| consistency = max(0, min(100, 100 - (cv * 50))) |
|
|
| return { |
| "sources": dict(sources), |
| "concentration_score": hhi * 100, |
| "consistency_score": consistency, |
| "monthly_variance": np.var(list(monthly_incomes.values())) if monthly_incomes else 0 |
| } |
|
|
| def analyze_expense_structure(self): |
| categories = defaultdict(float) |
| for e in self.expenses: |
| categories[e.get('category', 'Uncategorized')] += float(e.get('amount', 0)) |
| |
| fixed_proxy = ['rent', 'mortgage', 'insurance', 'subscription', 'utilities', 'internet'] |
| fixed_total = sum(v for k,v in categories.items() if any(f in k.lower() for f in fixed_proxy)) |
| |
| return { |
| "breakdown": dict(categories), |
| "fixed_ratio": (fixed_total / self.total_expense * 100) if self.total_expense > 0 else 0, |
| "discretionary_ratio": 100 - ((fixed_total / self.total_expense * 100) if self.total_expense > 0 else 0) |
| } |
|
|
| def calculate_financial_health_score(self): |
| |
| exec_data = self.calculate_executive_overview() |
| inc_data = self.analyze_income_structure() |
| |
| score = 0 |
| |
| |
| s_rate = exec_data['savings_rate'] |
| score += min(40, s_rate * 2) |
| |
| |
| score += (exec_data['stability_score'] * 0.2) |
| |
| |
| score += (inc_data['consistency_score'] * 0.2) |
| |
| |
| if exec_data['net_cash_flow'] > 0: |
| score += 20 |
| |
| return min(100, max(0, score)) |
|
|
| def analyze_temporal_patterns(self): |
| |
| dow_map = defaultdict(float) |
| for e in self.expenses: |
| dow = e['date'].weekday() |
| dow_map[dow] += float(e.get('amount', 0)) |
| |
| days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] |
| return [ |
| {'day': days[i], 'amount': round(dow_map[i], 2)} |
| for i in range(7) |
| ] |
|
|
| def detect_recurring_payments(self): |
| |
| groups = defaultdict(list) |
| for e in self.expenses: |
| key = (e.get('title', '').strip().lower(), round(float(e.get('amount', 0)), 2)) |
| groups[key].append(e['date']) |
| |
| recurring = [] |
| for (merchant, amount), dates in groups.items(): |
| if len(dates) >= 2: |
| |
| dates.sort() |
| intervals = [(dates[i+1] - dates[i]).days for i in range(len(dates)-1)] |
| avg_interval = np.mean(intervals) |
| if 25 <= avg_interval <= 35: |
| recurring.append({ |
| 'merchant': merchant.title(), |
| 'amount': amount, |
| 'frequency': 'Monthly', |
| 'next_due': (dates[-1] + timedelta(days=30)).strftime('%Y-%m-%d') |
| }) |
| return recurring |
|
|
| def analyze_capex(self): |
| |
| capex = [] |
| for e in self.expenses: |
| amt = float(e.get('amount', 0)) |
| if amt > 500: |
| capex.append({ |
| 'date': e['date'].strftime('%Y-%m-%d'), |
| 'item': e.get('title', 'Large Purchase'), |
| 'amount': amt, |
| 'category': e.get('category', 'Uncategorized') |
| }) |
| return sorted(capex, key=lambda x: x['date'], reverse=True) |
|
|
| def estimate_tax_liability(self): |
| |
| if not self.incomes: return {'estimated_tax': 0, 'bracket': '0%'} |
| |
| |
| annual_income = self.total_income |
| |
| |
| tax = 0 |
| income = annual_income |
| brackets = [ |
| (11600, 0.10), |
| (47150, 0.12), |
| (100525, 0.22), |
| (191950, 0.24), |
| (243725, 0.32), |
| (609350, 0.35), |
| (float('inf'), 0.37) |
| ] |
| |
| previous_limit = 0 |
| for limit, rate in brackets: |
| if income > previous_limit: |
| taxable_in_bracket = min(income, limit) - previous_limit |
| tax += taxable_in_bracket * rate |
| previous_limit = limit |
| else: |
| break |
| |
| effective_rate = (tax / annual_income * 100) if annual_income > 0 else 0 |
| return { |
| 'estimated_tax': round(tax, 2), |
| 'effective_rate': round(effective_rate, 1), |
| 'taxable_income': round(annual_income, 2) |
| } |
|
|
| def calculate_runway(self): |
| |
| burn_rate = self.calculate_executive_overview()['burn_rate'] |
| |
| |
| |
| |
| assets = self.net_savings |
| |
| if burn_rate <= 0: return "Infinite" |
| if assets <= 0: return "0 Months" |
| |
| months = assets / burn_rate |
| return round(months, 1) |
|
|
| def analyze_monthly_trends(self): |
| |
| today = datetime.now() |
| trends = [] |
| |
| |
| for i in range(11, -1, -1): |
| year = today.year |
| month = today.month - i |
| |
| |
| while month <= 0: |
| month += 12 |
| year -= 1 |
| |
| month_str = f"{year}-{month:02d}" |
| |
| inc = sum(float(x.get('amount', 0)) for x in self.incomes if x['date'].strftime('%Y-%m') == month_str) |
| exp = sum(float(x.get('amount', 0)) for x in self.expenses if x['date'].strftime('%Y-%m') == month_str) |
| |
| trends.append({ |
| "month": month_str, |
| "income": round(inc, 2), |
| "expense": round(exp, 2), |
| "net": round(inc - exp, 2) |
| }) |
| return trends |
|
|
| def analyze_category_details(self): |
| |
| if self.total_expense == 0: return [] |
| |
| cats = defaultdict(float) |
| for e in self.expenses: |
| cats[e.get('category', 'Uncategorized')] += float(e.get('amount', 0)) |
| |
| sorted_cats = sorted(cats.items(), key=lambda x: x[1], reverse=True) |
| |
| return [ |
| { |
| "category": k, |
| "amount": round(v, 2), |
| "percentage": round((v / self.total_expense) * 100, 1) |
| } |
| for k, v in sorted_cats |
| ] |
|
|
| def generate_recommendations(self): |
| |
| recs = [] |
| |
| |
| if self.total_income > 0: |
| savings_rate = (self.net_savings / self.total_income) * 100 |
| if savings_rate < 20: |
| recs.append(f"Increase savings rate. Current is {round(savings_rate, 1)}%, aim for >20%.") |
| else: |
| recs.append(f"Great savings rate ({round(savings_rate, 1)}%). Consider investing the surplus.") |
| |
| |
| monthly_trends = self.analyze_monthly_trends() |
| if monthly_trends: |
| avg_exp = np.mean([m['expense'] for m in monthly_trends]) |
| if monthly_trends[-1]['expense'] > avg_exp * 1.2: |
| recs.append("Recent month expenses are 20% higher than average. Review discretionary spending.") |
|
|
| |
| cats = self.analyze_category_details() |
| for c in cats: |
| if c['category'].lower() in ['dining', 'entertainment', 'shopping'] and c['percentage'] > 15: |
| recs.append(f"High spending in {c['category']} ({c['percentage']}%). Try to reduce to <10%.") |
| |
| |
| burn = self.calculate_executive_overview()['burn_rate'] |
| if burn > 0 and self.net_savings < burn * 3: |
| recs.append("Emergency fund is low. Build a buffer of at least 3 months of expenses.") |
|
|
| return recs |
|
|
| def calculate_financial_velocity(self): |
| |
| |
| if self.expenses: |
| today = max(e['date'] for e in self.expenses) |
| elif self.all_txns: |
| today = max(t['date'] for t in self.all_txns) |
| else: |
| today = datetime.now() |
| |
| current_month_str = today.strftime('%Y-%m') |
| |
| |
| current_month_expenses = [ |
| float(e.get('amount', 0)) for e in self.expenses |
| if e['date'].strftime('%Y-%m') == current_month_str |
| ] |
| |
| spent_so_far = sum(current_month_expenses) |
| days_passed = max(1, today.day) |
| daily_burn = spent_so_far / days_passed |
| |
| if today.month == 12: |
| next_month = today.replace(year=today.year + 1, month=1, day=1) |
| else: |
| next_month = today.replace(month=today.month + 1, day=1) |
| |
| days_in_month = (next_month - timedelta(days=1)).day |
| projected_total = daily_burn * days_in_month |
| |
| |
| past_3_months = [] |
| for i in range(1, 4): |
| d = today - timedelta(days=i*30) |
| m_str = d.strftime('%Y-%m') |
| m_exp = sum(float(e.get('amount', 0)) for e in self.expenses if e['date'].strftime('%Y-%m') == m_str) |
| if m_exp > 0: past_3_months.append(m_exp) |
| |
| avg_monthly_spend = np.mean(past_3_months) if past_3_months else (projected_total * 1.1) |
| |
| velocity_status = "On Track" |
| if projected_total > avg_monthly_spend * 1.15: |
| velocity_status = "Accelerating (High Spend)" |
| elif projected_total < avg_monthly_spend * 0.85: |
| velocity_status = "Decelerating (Saving)" |
| |
| return { |
| "spent_so_far": round(spent_so_far, 2), |
| "daily_burn": round(daily_burn, 2), |
| "projected_total": round(projected_total, 2), |
| "benchmark_avg": round(avg_monthly_spend, 2), |
| "status": velocity_status, |
| "days_remaining": days_in_month - days_passed |
| } |
|
|
| def get_recent_transactions(self, limit=5): |
| |
| |
| recent = sorted(self.all_txns, key=lambda x: x['date'], reverse=True)[:limit] |
| |
| formatted = [] |
| for t in recent: |
| formatted.append({ |
| "date": t['date'].strftime('%Y-%m-%d'), |
| "title": t.get('title', 'Unknown'), |
| "amount": float(t.get('amount', 0)), |
| "type": "expense" if t in self.expenses else "income", |
| "category": t.get('category', 'Uncategorized') |
| }) |
| return formatted |
|
|
| def generate_full_report(self): |
| return { |
| "executive": self.calculate_executive_overview(), |
| "income_analysis": self.analyze_income_structure(), |
| "expense_analysis": self.analyze_expense_structure(), |
| "health_score": self.calculate_financial_health_score(), |
| "temporal": self.analyze_temporal_patterns(), |
| "recurring": self.detect_recurring_payments(), |
| "capex": self.analyze_capex(), |
| "tax": self.estimate_tax_liability(), |
| "runway": self.calculate_runway(), |
| "monthly_trends": self.analyze_monthly_trends(), |
| "category_details": self.analyze_category_details(), |
| "recommendations": self.generate_recommendations(), |
| "velocity": self.calculate_financial_velocity(), |
| "recent_activity": self.get_recent_transactions() |
| } |
|
|