FinMK / backend /finance /analytics.py
Kumar
Refactor: Exclude PDF and CSV files from Git to fix HF push error
24e6f5b
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
class FinancialAnalytics:
def __init__(self, incomes, expenses):
self.incomes = sorted(incomes, key=lambda x: x['date'])
self.expenses = sorted(expenses, key=lambda x: x['date'])
self.all_txns = sorted(incomes + expenses, key=lambda x: x['date'])
# Basic Aggregates
self.total_income = sum(float(i.get('amount', 0)) for i in self.incomes)
self.total_expense = sum(float(e.get('amount', 0)) for e in self.expenses)
self.net_savings = self.total_income - self.total_expense
def _get_monthly_aggregates(self, txns):
monthly_map = defaultdict(float)
for t in txns:
m = t['date'].strftime('%Y-%m')
monthly_map[m] += float(t.get('amount', 0))
return monthly_map
def calculate_executive_overview(self):
savings_rate = (self.net_savings / self.total_income * 100) if self.total_income > 0 else 0
# Cash Burn Rate (Monthly Avg Expense)
monthly_expenses = self._get_monthly_aggregates(self.expenses)
burn_rate = np.mean(list(monthly_expenses.values())) if monthly_expenses else 0
# Stability Index (Simple Volatility measure)
if monthly_expenses:
std_dev = np.std(list(monthly_expenses.values()))
mean_exp = np.mean(list(monthly_expenses.values()))
# Lower CV (Coefficient of Variation) is better.
# Score 100 - (CV * 100). If CV > 1, score 0.
cv = std_dev / mean_exp if mean_exp > 0 else 0
stability_score = max(0, min(100, 100 - (cv * 100)))
else:
stability_score = 0
return {
"total_income": self.total_income,
"total_expenses": self.total_expense,
"net_cash_flow": self.net_savings,
"savings_rate": savings_rate,
"burn_rate": burn_rate,
"stability_score": stability_score,
"status": "Surplus" if self.net_savings >= 0 else "Deficit"
}
def analyze_income_structure(self):
sources = defaultdict(float)
for i in self.incomes:
sources[i.get('category', 'Uncategorized')] += float(i.get('amount', 0))
# HHI (Herfindahl-Hirschman Index) for Concentration
total_sq = sum(v**2 for v in sources.values())
hhi = (total_sq / (self.total_income**2)) if self.total_income > 0 else 0
monthly_incomes = self._get_monthly_aggregates(self.incomes)
consistency = 0
if monthly_incomes:
std_dev = np.std(list(monthly_incomes.values()))
mean_inc = np.mean(list(monthly_incomes.values()))
cv = std_dev / mean_inc if mean_inc > 0 else 0
consistency = max(0, min(100, 100 - (cv * 50))) # Penalty for volatility
return {
"sources": dict(sources),
"concentration_score": hhi * 100, # 100 = Single source (High Risk), 0 = Diverse
"consistency_score": consistency,
"monthly_variance": np.var(list(monthly_incomes.values())) if monthly_incomes else 0
}
def analyze_expense_structure(self):
categories = defaultdict(float)
for e in self.expenses:
categories[e.get('category', 'Uncategorized')] += float(e.get('amount', 0))
fixed_proxy = ['rent', 'mortgage', 'insurance', 'subscription', 'utilities', 'internet']
fixed_total = sum(v for k,v in categories.items() if any(f in k.lower() for f in fixed_proxy))
return {
"breakdown": dict(categories),
"fixed_ratio": (fixed_total / self.total_expense * 100) if self.total_expense > 0 else 0,
"discretionary_ratio": 100 - ((fixed_total / self.total_expense * 100) if self.total_expense > 0 else 0)
}
def calculate_financial_health_score(self):
# 0-100 Composite Score
exec_data = self.calculate_executive_overview()
inc_data = self.analyze_income_structure()
score = 0
# 1. Savings Rate (Max 40)
s_rate = exec_data['savings_rate']
score += min(40, s_rate * 2) # 20% savings = 40 pts
# 2. Stability (Max 20)
score += (exec_data['stability_score'] * 0.2)
# 3. Income Consistency (Max 20)
score += (inc_data['consistency_score'] * 0.2)
# 4. Solvency/Liquidity (Max 20) - Simplified as positive cash flow
if exec_data['net_cash_flow'] > 0:
score += 20
return min(100, max(0, score))
def analyze_temporal_patterns(self):
# Day of Week Analysis (0=Mon, 6=Sun)
dow_map = defaultdict(float)
for e in self.expenses:
dow = e['date'].weekday()
dow_map[dow] += float(e.get('amount', 0))
days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
return [
{'day': days[i], 'amount': round(dow_map[i], 2)}
for i in range(7)
]
def detect_recurring_payments(self):
# Simple algorithm: Group by Merchant + Amount (rounded)
groups = defaultdict(list)
for e in self.expenses:
key = (e.get('title', '').strip().lower(), round(float(e.get('amount', 0)), 2))
groups[key].append(e['date'])
recurring = []
for (merchant, amount), dates in groups.items():
if len(dates) >= 2:
# Check if dates are roughly 28-31 days apart
dates.sort()
intervals = [(dates[i+1] - dates[i]).days for i in range(len(dates)-1)]
avg_interval = np.mean(intervals)
if 25 <= avg_interval <= 35:
recurring.append({
'merchant': merchant.title(),
'amount': amount,
'frequency': 'Monthly',
'next_due': (dates[-1] + timedelta(days=30)).strftime('%Y-%m-%d')
})
return recurring
def analyze_capex(self):
# Purchases > $500
capex = []
for e in self.expenses:
amt = float(e.get('amount', 0))
if amt > 500:
capex.append({
'date': e['date'].strftime('%Y-%m-%d'),
'item': e.get('title', 'Large Purchase'),
'amount': amt,
'category': e.get('category', 'Uncategorized')
})
return sorted(capex, key=lambda x: x['date'], reverse=True)
def estimate_tax_liability(self):
# Simple Progressive Tax Estimation (Annualized)
if not self.incomes: return {'estimated_tax': 0, 'bracket': '0%'}
# Annualize based on available data range or just assume simple run rate
annual_income = self.total_income # Simplified for now, assuming data IS annual or close to it
# 2025 Standard Brackets (Simplified Single Filer)
tax = 0
income = annual_income
brackets = [
(11600, 0.10),
(47150, 0.12),
(100525, 0.22),
(191950, 0.24),
(243725, 0.32),
(609350, 0.35),
(float('inf'), 0.37)
]
previous_limit = 0
for limit, rate in brackets:
if income > previous_limit:
taxable_in_bracket = min(income, limit) - previous_limit
tax += taxable_in_bracket * rate
previous_limit = limit
else:
break
effective_rate = (tax / annual_income * 100) if annual_income > 0 else 0
return {
'estimated_tax': round(tax, 2),
'effective_rate': round(effective_rate, 1),
'taxable_income': round(annual_income, 2)
}
def calculate_runway(self):
# Months to Zero
burn_rate = self.calculate_executive_overview()['burn_rate']
# Assuming Net Savings IS the current liquid assets for this context
# In reality, we'd query a "Balance" but we only have transaction history here
# So we use net_savings as a proxy for "Cash on Hand" generated during this period
assets = self.net_savings
if burn_rate <= 0: return "Infinite"
if assets <= 0: return "0 Months"
months = assets / burn_rate
return round(months, 1)
def analyze_monthly_trends(self):
# Last 12 Months Trends (Precise iteration)
today = datetime.now()
trends = []
# Start from 11 months ago to current month
for i in range(11, -1, -1):
year = today.year
month = today.month - i
# Adjust for year wrapping
while month <= 0:
month += 12
year -= 1
month_str = f"{year}-{month:02d}"
inc = sum(float(x.get('amount', 0)) for x in self.incomes if x['date'].strftime('%Y-%m') == month_str)
exp = sum(float(x.get('amount', 0)) for x in self.expenses if x['date'].strftime('%Y-%m') == month_str)
trends.append({
"month": month_str,
"income": round(inc, 2),
"expense": round(exp, 2),
"net": round(inc - exp, 2)
})
return trends
def analyze_category_details(self):
# Detailed Expense Breakdown
if self.total_expense == 0: return []
cats = defaultdict(float)
for e in self.expenses:
cats[e.get('category', 'Uncategorized')] += float(e.get('amount', 0))
sorted_cats = sorted(cats.items(), key=lambda x: x[1], reverse=True)
return [
{
"category": k,
"amount": round(v, 2),
"percentage": round((v / self.total_expense) * 100, 1)
}
for k, v in sorted_cats
]
def generate_recommendations(self):
# Rule-Based Advise
recs = []
# 1. Savings Rule
if self.total_income > 0:
savings_rate = (self.net_savings / self.total_income) * 100
if savings_rate < 20:
recs.append(f"Increase savings rate. Current is {round(savings_rate, 1)}%, aim for >20%.")
else:
recs.append(f"Great savings rate ({round(savings_rate, 1)}%). Consider investing the surplus.")
# 2. Expense Rules
monthly_trends = self.analyze_monthly_trends()
if monthly_trends:
avg_exp = np.mean([m['expense'] for m in monthly_trends])
if monthly_trends[-1]['expense'] > avg_exp * 1.2:
recs.append("Recent month expenses are 20% higher than average. Review discretionary spending.")
# 3. Category Rules
cats = self.analyze_category_details()
for c in cats:
if c['category'].lower() in ['dining', 'entertainment', 'shopping'] and c['percentage'] > 15:
recs.append(f"High spending in {c['category']} ({c['percentage']}%). Try to reduce to <10%.")
# 4. Burn Rate
burn = self.calculate_executive_overview()['burn_rate']
if burn > 0 and self.net_savings < burn * 3:
recs.append("Emergency fund is low. Build a buffer of at least 3 months of expenses.")
return recs
def calculate_financial_velocity(self):
# Real-time pacing analysis
# ANCHOR FIX: Use latest available data date as "today" for velocity calculations
if self.expenses:
today = max(e['date'] for e in self.expenses)
elif self.all_txns:
today = max(t['date'] for t in self.all_txns)
else:
today = datetime.now()
current_month_str = today.strftime('%Y-%m')
# Filter for current month expenses
current_month_expenses = [
float(e.get('amount', 0)) for e in self.expenses
if e['date'].strftime('%Y-%m') == current_month_str
]
spent_so_far = sum(current_month_expenses)
days_passed = max(1, today.day)
daily_burn = spent_so_far / days_passed
if today.month == 12:
next_month = today.replace(year=today.year + 1, month=1, day=1)
else:
next_month = today.replace(month=today.month + 1, day=1)
days_in_month = (next_month - timedelta(days=1)).day
projected_total = daily_burn * days_in_month
# Calculate trailing 3-month average for comparison
past_3_months = []
for i in range(1, 4):
d = today - timedelta(days=i*30)
m_str = d.strftime('%Y-%m')
m_exp = sum(float(e.get('amount', 0)) for e in self.expenses if e['date'].strftime('%Y-%m') == m_str)
if m_exp > 0: past_3_months.append(m_exp)
avg_monthly_spend = np.mean(past_3_months) if past_3_months else (projected_total * 1.1)
velocity_status = "On Track"
if projected_total > avg_monthly_spend * 1.15:
velocity_status = "Accelerating (High Spend)"
elif projected_total < avg_monthly_spend * 0.85:
velocity_status = "Decelerating (Saving)"
return {
"spent_so_far": round(spent_so_far, 2),
"daily_burn": round(daily_burn, 2),
"projected_total": round(projected_total, 2),
"benchmark_avg": round(avg_monthly_spend, 2),
"status": velocity_status,
"days_remaining": days_in_month - days_passed
}
def get_recent_transactions(self, limit=5):
# Get purely recent activity for "Live" feel
# self.all_txns is already sorted by date (asc), so take from end
recent = sorted(self.all_txns, key=lambda x: x['date'], reverse=True)[:limit]
formatted = []
for t in recent:
formatted.append({
"date": t['date'].strftime('%Y-%m-%d'),
"title": t.get('title', 'Unknown'),
"amount": float(t.get('amount', 0)),
"type": "expense" if t in self.expenses else "income",
"category": t.get('category', 'Uncategorized')
})
return formatted
def generate_full_report(self):
return {
"executive": self.calculate_executive_overview(),
"income_analysis": self.analyze_income_structure(),
"expense_analysis": self.analyze_expense_structure(),
"health_score": self.calculate_financial_health_score(),
"temporal": self.analyze_temporal_patterns(),
"recurring": self.detect_recurring_payments(),
"capex": self.analyze_capex(),
"tax": self.estimate_tax_liability(),
"runway": self.calculate_runway(),
"monthly_trends": self.analyze_monthly_trends(),
"category_details": self.analyze_category_details(),
"recommendations": self.generate_recommendations(),
"velocity": self.calculate_financial_velocity(),
"recent_activity": self.get_recent_transactions()
}