"""Analytics aggregations over saved receipts. Phase 3: pure groupby helpers used by the Analytics dashboard. No network, no models — they take a list of stored receipt records (as returned by core.storage.load_records) and return plain Python aggregates the UI turns into cards / charts / tables. Everything degrades gracefully on empty/sparse data. Time grouping uses the receipt `date` (YYYY-MM-DD); the stored `total` already includes service charge / tax / tip / discount. """ from __future__ import annotations import re from collections import Counter, defaultdict from datetime import date, datetime from typing import Any # --------------------------------------------------------------------------- # # Small parsing helpers # --------------------------------------------------------------------------- # def _num(value: Any) -> float: try: return float(value) except (TypeError, ValueError): return 0.0 def _safe_date(year: int, month: int, day: int) -> date | None: try: return date(year, month, day) except ValueError: return None _ISO_RE = re.compile(r"^(\d{4})[-/.](\d{1,2})[-/.](\d{1,2})$") _NUMERIC_RE = re.compile(r"^(\d{1,2})[-/.](\d{1,2})[-/.](\d{2,4})$") _TEXT_FORMATS = ( "%d %b %Y", "%d %B %Y", "%b %d %Y", "%B %d %Y", "%b %d, %Y", "%B %d, %Y", "%d %b %y", "%d %B %y", ) def parse_date(value: Any) -> date | None: """Parse a receipt date into a date, tolerant of common formats. The schema asks the model for 'YYYY-MM-DD', but receipts (and the OCR model) frequently produce DD/MM/YY, DD-MM-YYYY, US MM/DD/YYYY, '14 Jun 2026', etc. Without this, those records silently vanish from all time-based views. Strategy: year-first ISO if it starts with a 4-digit year; otherwise a day/month/year numeric form (day-first preferred, month-first as fallback, 2-digit years mapped to 2000s/1900s); then a few textual-month formats. Returns None if nothing parses. """ if value is None: return None s = str(value).strip() if not s: return None m = _ISO_RE.match(s) if m: y, mo, d = (int(x) for x in m.groups()) return _safe_date(y, mo, d) m = _NUMERIC_RE.match(s) if m: a, b, y = (int(x) for x in m.groups()) if y < 100: y += 2000 if y < 70 else 1900 # Prefer day-first (DD/MM); fall back to month-first (MM/DD) if the # day-first reading is an invalid calendar date. for day, month in ((a, b), (b, a)): parsed = _safe_date(y, month, day) if parsed: return parsed return None for fmt in _TEXT_FORMATS: try: return datetime.strptime(s, fmt).date() except ValueError: continue return None def _category(record: dict[str, Any]) -> str: return str(record.get("category") or record.get("receipt_category") or "Other") def _prev_month(y: int, m: int) -> tuple[int, int]: return (y, m - 1) if m > 1 else (y - 1, 12) def dominant_currency(records: list[dict[str, Any]]) -> str: """Most common non-empty currency among records ('' if none).""" counts = Counter( str(r.get("currency", "")).strip() for r in records if str(r.get("currency", "")).strip() ) return counts.most_common(1)[0][0] if counts else "" # --------------------------------------------------------------------------- # # Filtering # --------------------------------------------------------------------------- # def filter_records( records: list[dict[str, Any]], start: date | None = None, end: date | None = None, category: str | None = None, ) -> list[dict[str, Any]]: """Filter by inclusive date range and/or category ('All'/None = any). Records without a parseable date are excluded only when a date bound is set. """ has_range = start is not None or end is not None cat = None if (category in (None, "", "All")) else category out: list[dict[str, Any]] = [] for r in records: if has_range: d = parse_date(r.get("date")) if d is None: continue if start is not None and d < start: continue if end is not None and d > end: continue if cat is not None and _category(r) != cat: continue out.append(r) return out # --------------------------------------------------------------------------- # # Aggregations # --------------------------------------------------------------------------- # def summary(records: list[dict[str, Any]], today: date | None = None) -> dict[str, Any]: """Headline numbers for the summary cards. this_month_total, prev_month_total, pct_change (None if no prior-month baseline), top_category (this month), receipts_this_month, total_receipts, and the dominant currency. """ today = today or date.today() cur = (today.year, today.month) prev = _prev_month(today.year, today.month) cur_total = prev_total = 0.0 cur_count = 0 cur_cat_totals: dict[str, float] = defaultdict(float) for r in records: d = parse_date(r.get("date")) if d is None: continue amt = _num(r.get("total")) mk = (d.year, d.month) if mk == cur: cur_total += amt cur_count += 1 cur_cat_totals[_category(r)] += amt elif mk == prev: prev_total += amt pct_change = None if prev_total == 0 else (cur_total - prev_total) / prev_total * 100 top_category = ( max(cur_cat_totals, key=cur_cat_totals.get) if cur_cat_totals else None ) return { "this_month_total": round(cur_total, 2), "prev_month_total": round(prev_total, 2), "pct_change": pct_change, "top_category": top_category, "receipts_this_month": cur_count, "total_receipts": len(records), "currency": dominant_currency(records), } def spend_by_category(records: list[dict[str, Any]]) -> list[dict[str, Any]]: """Spend per category, descending — allocated at the LINE-ITEM level so a mixed bill is split across categories. Charges go to the bill's overall category; item-less transactions (payments/manual) use their own category. """ totals: dict[str, float] = defaultdict(float) for r in records: items = r.get("line_items") or [] if items: for it in items: totals[str(it.get("category") or _category(r))] += _num(it.get("amount")) charges_sum = sum(_num(c.get("amount")) for c in (r.get("charges") or [])) if charges_sum: totals[_category(r)] += charges_sum else: totals[_category(r)] += _num(r.get("total")) return [ {"category": k, "amount": round(v, 2)} for k, v in sorted(totals.items(), key=lambda kv: kv[1], reverse=True) if round(v, 2) != 0 ] def category_comparison( records: list[dict[str, Any]], today: date | None = None ) -> list[dict[str, Any]]: """Per-category spend this month vs last month -> [{category, this, last}].""" today = today or date.today() cur = (today.year, today.month) prev = _prev_month(today.year, today.month) def _month(ym): out = [] for r in records: d = parse_date(r.get("date")) if d and (d.year, d.month) == ym: out.append(r) return out this = {d["category"]: d["amount"] for d in spend_by_category(_month(cur))} last = {d["category"]: d["amount"] for d in spend_by_category(_month(prev))} cats = sorted(set(this) | set(last), key=lambda c: this.get(c, 0) + last.get(c, 0), reverse=True) return [{"category": c, "this": round(this.get(c, 0), 2), "last": round(last.get(c, 0), 2)} for c in cats] def calendar_data(records: list[dict[str, Any]], year: int, month: int) -> dict[str, float]: """{"day": total spend} for the given month (string keys — JSON-safe).""" days: dict[str, float] = defaultdict(float) for r in records: d = parse_date(r.get("date")) if d and d.year == year and d.month == month: days[str(d.day)] += _num(r.get("total")) return {k: round(v, 2) for k, v in days.items()} def _period_key(d: date, granularity: str) -> str: g = (granularity or "Monthly").lower() if g.startswith("dai"): return d.isoformat() if g.startswith("week"): iso = d.isocalendar() return f"{iso[0]}-W{iso[1]:02d}" return f"{d.year:04d}-{d.month:02d}" # monthly def spend_over_time( records: list[dict[str, Any]], granularity: str = "Monthly" ) -> list[dict[str, Any]]: """Total spend per period bucket, chronological. -> [{period, amount}, ...]. Records without a parseable date are skipped. """ totals: dict[str, float] = defaultdict(float) for r in records: d = parse_date(r.get("date")) if d is None: continue totals[_period_key(d, granularity)] += _num(r.get("total")) return [ {"period": k, "amount": round(v, 2)} for k, v in sorted(totals.items()) ] def transactions_table(records: list[dict[str, Any]]) -> list[list[Any]]: """Rows [date, vendor, total, category], most recent first. Undated records sort to the bottom. """ def sort_key(r: dict[str, Any]): d = parse_date(r.get("date")) return (d or date.min, str(r.get("saved_at", ""))) rows: list[list[Any]] = [] for r in sorted(records, key=sort_key, reverse=True): rows.append( [ r.get("date", ""), r.get("vendor", ""), _num(r.get("total")), _category(r), ] ) return rows