BudgetBuddy / core /analytics.py
KrishnaGarg's picture
Deploy BudgetBuddy update
4caaec6 verified
Raw
History Blame Contribute Delete
9.9 kB
"""Analytics aggregations over saved receipts.
Phase 3: pure groupby helpers used by the Analytics dashboard. No network, no
models — they take a list of stored receipt records (as returned by
core.storage.load_records) and return plain Python aggregates the UI turns into
cards / charts / tables. Everything degrades gracefully on empty/sparse data.
Time grouping uses the receipt `date` (YYYY-MM-DD); the stored `total` already
includes service charge / tax / tip / discount.
"""
from __future__ import annotations
import re
from collections import Counter, defaultdict
from datetime import date, datetime
from typing import Any
# --------------------------------------------------------------------------- #
# Small parsing helpers
# --------------------------------------------------------------------------- #
def _num(value: Any) -> float:
try:
return float(value)
except (TypeError, ValueError):
return 0.0
def _safe_date(year: int, month: int, day: int) -> date | None:
try:
return date(year, month, day)
except ValueError:
return None
_ISO_RE = re.compile(r"^(\d{4})[-/.](\d{1,2})[-/.](\d{1,2})$")
_NUMERIC_RE = re.compile(r"^(\d{1,2})[-/.](\d{1,2})[-/.](\d{2,4})$")
_TEXT_FORMATS = (
"%d %b %Y", "%d %B %Y", "%b %d %Y", "%B %d %Y",
"%b %d, %Y", "%B %d, %Y", "%d %b %y", "%d %B %y",
)
def parse_date(value: Any) -> date | None:
"""Parse a receipt date into a date, tolerant of common formats.
The schema asks the model for 'YYYY-MM-DD', but receipts (and the OCR model)
frequently produce DD/MM/YY, DD-MM-YYYY, US MM/DD/YYYY, '14 Jun 2026', etc.
Without this, those records silently vanish from all time-based views.
Strategy: year-first ISO if it starts with a 4-digit year; otherwise a
day/month/year numeric form (day-first preferred, month-first as fallback,
2-digit years mapped to 2000s/1900s); then a few textual-month formats.
Returns None if nothing parses.
"""
if value is None:
return None
s = str(value).strip()
if not s:
return None
m = _ISO_RE.match(s)
if m:
y, mo, d = (int(x) for x in m.groups())
return _safe_date(y, mo, d)
m = _NUMERIC_RE.match(s)
if m:
a, b, y = (int(x) for x in m.groups())
if y < 100:
y += 2000 if y < 70 else 1900
# Prefer day-first (DD/MM); fall back to month-first (MM/DD) if the
# day-first reading is an invalid calendar date.
for day, month in ((a, b), (b, a)):
parsed = _safe_date(y, month, day)
if parsed:
return parsed
return None
for fmt in _TEXT_FORMATS:
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def _category(record: dict[str, Any]) -> str:
return str(record.get("category") or record.get("receipt_category") or "Other")
def _prev_month(y: int, m: int) -> tuple[int, int]:
return (y, m - 1) if m > 1 else (y - 1, 12)
def dominant_currency(records: list[dict[str, Any]]) -> str:
"""Most common non-empty currency among records ('' if none)."""
counts = Counter(
str(r.get("currency", "")).strip()
for r in records
if str(r.get("currency", "")).strip()
)
return counts.most_common(1)[0][0] if counts else ""
# --------------------------------------------------------------------------- #
# Filtering
# --------------------------------------------------------------------------- #
def filter_records(
records: list[dict[str, Any]],
start: date | None = None,
end: date | None = None,
category: str | None = None,
) -> list[dict[str, Any]]:
"""Filter by inclusive date range and/or category ('All'/None = any).
Records without a parseable date are excluded only when a date bound is set.
"""
has_range = start is not None or end is not None
cat = None if (category in (None, "", "All")) else category
out: list[dict[str, Any]] = []
for r in records:
if has_range:
d = parse_date(r.get("date"))
if d is None:
continue
if start is not None and d < start:
continue
if end is not None and d > end:
continue
if cat is not None and _category(r) != cat:
continue
out.append(r)
return out
# --------------------------------------------------------------------------- #
# Aggregations
# --------------------------------------------------------------------------- #
def summary(records: list[dict[str, Any]], today: date | None = None) -> dict[str, Any]:
"""Headline numbers for the summary cards.
this_month_total, prev_month_total, pct_change (None if no prior-month
baseline), top_category (this month), receipts_this_month, total_receipts,
and the dominant currency.
"""
today = today or date.today()
cur = (today.year, today.month)
prev = _prev_month(today.year, today.month)
cur_total = prev_total = 0.0
cur_count = 0
cur_cat_totals: dict[str, float] = defaultdict(float)
for r in records:
d = parse_date(r.get("date"))
if d is None:
continue
amt = _num(r.get("total"))
mk = (d.year, d.month)
if mk == cur:
cur_total += amt
cur_count += 1
cur_cat_totals[_category(r)] += amt
elif mk == prev:
prev_total += amt
pct_change = None if prev_total == 0 else (cur_total - prev_total) / prev_total * 100
top_category = (
max(cur_cat_totals, key=cur_cat_totals.get) if cur_cat_totals else None
)
return {
"this_month_total": round(cur_total, 2),
"prev_month_total": round(prev_total, 2),
"pct_change": pct_change,
"top_category": top_category,
"receipts_this_month": cur_count,
"total_receipts": len(records),
"currency": dominant_currency(records),
}
def spend_by_category(records: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Spend per category, descending — allocated at the LINE-ITEM level so a
mixed bill is split across categories. Charges go to the bill's overall
category; item-less transactions (payments/manual) use their own category.
"""
totals: dict[str, float] = defaultdict(float)
for r in records:
items = r.get("line_items") or []
if items:
for it in items:
totals[str(it.get("category") or _category(r))] += _num(it.get("amount"))
charges_sum = sum(_num(c.get("amount")) for c in (r.get("charges") or []))
if charges_sum:
totals[_category(r)] += charges_sum
else:
totals[_category(r)] += _num(r.get("total"))
return [
{"category": k, "amount": round(v, 2)}
for k, v in sorted(totals.items(), key=lambda kv: kv[1], reverse=True)
if round(v, 2) != 0
]
def category_comparison(
records: list[dict[str, Any]], today: date | None = None
) -> list[dict[str, Any]]:
"""Per-category spend this month vs last month -> [{category, this, last}]."""
today = today or date.today()
cur = (today.year, today.month)
prev = _prev_month(today.year, today.month)
def _month(ym):
out = []
for r in records:
d = parse_date(r.get("date"))
if d and (d.year, d.month) == ym:
out.append(r)
return out
this = {d["category"]: d["amount"] for d in spend_by_category(_month(cur))}
last = {d["category"]: d["amount"] for d in spend_by_category(_month(prev))}
cats = sorted(set(this) | set(last),
key=lambda c: this.get(c, 0) + last.get(c, 0), reverse=True)
return [{"category": c, "this": round(this.get(c, 0), 2),
"last": round(last.get(c, 0), 2)} for c in cats]
def calendar_data(records: list[dict[str, Any]], year: int, month: int) -> dict[str, float]:
"""{"day": total spend} for the given month (string keys — JSON-safe)."""
days: dict[str, float] = defaultdict(float)
for r in records:
d = parse_date(r.get("date"))
if d and d.year == year and d.month == month:
days[str(d.day)] += _num(r.get("total"))
return {k: round(v, 2) for k, v in days.items()}
def _period_key(d: date, granularity: str) -> str:
g = (granularity or "Monthly").lower()
if g.startswith("dai"):
return d.isoformat()
if g.startswith("week"):
iso = d.isocalendar()
return f"{iso[0]}-W{iso[1]:02d}"
return f"{d.year:04d}-{d.month:02d}" # monthly
def spend_over_time(
records: list[dict[str, Any]], granularity: str = "Monthly"
) -> list[dict[str, Any]]:
"""Total spend per period bucket, chronological. -> [{period, amount}, ...].
Records without a parseable date are skipped.
"""
totals: dict[str, float] = defaultdict(float)
for r in records:
d = parse_date(r.get("date"))
if d is None:
continue
totals[_period_key(d, granularity)] += _num(r.get("total"))
return [
{"period": k, "amount": round(v, 2)} for k, v in sorted(totals.items())
]
def transactions_table(records: list[dict[str, Any]]) -> list[list[Any]]:
"""Rows [date, vendor, total, category], most recent first.
Undated records sort to the bottom.
"""
def sort_key(r: dict[str, Any]):
d = parse_date(r.get("date"))
return (d or date.min, str(r.get("saved_at", "")))
rows: list[list[Any]] = []
for r in sorted(records, key=sort_key, reverse=True):
rows.append(
[
r.get("date", ""),
r.get("vendor", ""),
_num(r.get("total")),
_category(r),
]
)
return rows