from __future__ import annotations import math import os from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, Iterable, List, Optional, Tuple import numpy as np import pandas as pd import plotly.express as px # ------------------------------ # Constants # ------------------------------ CATEGORIES = [ "Food", "Travel", "Shopping", "Utilities", "Entertainment", "Health", "Subscriptions", "Transport", ] MERCHANTS = [ "SuperMart", "QuickEats", "Urban Cafe", "MegaStore", "Cinema City", "Fit&Fine Gym", "City Utilities", "StreamFlix", "RideNow", "Book Haven", "ElectroWorld", "TravelCo", "PharmaPlus", "HomeNeeds", ] PAYMENT_METHODS = ["Debit Card", "Credit Card", "Digital Wallet"] LOCATIONS = [ "London", "Manchester", "Birmingham", "Leeds", "Glasgow", "Liverpool", "Bristol", "Edinburgh", "Cardiff", "Belfast", ] # ------------------------------ # Data Generation # ------------------------------ def _random_amounts(n: int, rng: np.random.Generator) -> np.ndarray: choices = rng.choice(["small", "medium", "large"], size=n, p=[0.65, 0.28, 0.07]) amounts = np.empty(n) for i, c in enumerate(choices): if c == "small": amounts[i] = max(1, rng.normal(15, 8)) elif c == "medium": amounts[i] = max(5, rng.normal(60, 25)) else: amounts[i] = max(20, rng.normal(180, 60)) spike_idx = rng.choice(np.arange(n), size=max(1, n // 50), replace=False) amounts[spike_idx] *= rng.uniform(2.5, 4.0, size=len(spike_idx)) return np.round(amounts, 2) def generate_synthetic_transactions(n_rows: int = 900, seed: Optional[int] = None) -> pd.DataFrame: rng = np.random.default_rng(seed) end = pd.Timestamp.today().normalize() start = end - pd.Timedelta(days=365) dates = pd.date_range(start, end, freq="D") weights = np.array([1.2 if d.weekday() >= 5 else 1.0 for d in dates]) * \ np.array([1.3 if d.day > 25 else 1.0 for d in dates]) weights = weights / weights.sum() date_choices = rng.choice(len(dates), size=n_rows, replace=True, p=weights) chosen_dates = dates[date_choices] categories = rng.choice(CATEGORIES, size=n_rows) merchants = rng.choice(MERCHANTS, size=n_rows) payment_methods = rng.choice(PAYMENT_METHODS, size=n_rows, p=[0.6, 0.25, 0.15]) locations = rng.choice(LOCATIONS, size=n_rows) amts = _random_amounts(n_rows, rng) df = pd.DataFrame({ "Date": pd.to_datetime(chosen_dates), "Merchant": merchants, "Category": categories, "Amount": amts, "Payment Method": payment_methods, "Location": locations, }) return df.sort_values("Date").reset_index(drop=True) # ------------------------------ # Filtering # ------------------------------ def filter_transactions( df: pd.DataFrame, date_range: Tuple[datetime, datetime], categories: Optional[Iterable[str]] = None, merchant_query: str = "", ) -> pd.DataFrame: start, end = date_range mask = (df["Date"] >= pd.to_datetime(start)) & (df["Date"] <= pd.to_datetime(end)) if categories: mask &= df["Category"].isin(list(categories)) if merchant_query: mask &= df["Merchant"].str.contains(merchant_query, case=False, na=False) return df.loc[mask].copy() # ------------------------------ # Aggregation # ------------------------------ def _month_key(s: pd.Series) -> pd.Series: return pd.to_datetime(s).dt.to_period("M").dt.to_timestamp() def compute_aggregations(df: pd.DataFrame) -> Dict: if df.empty: return { "total_spend": 0.0, "avg_monthly_spend": 0.0, "spend_per_category": pd.Series(dtype=float), "spend_per_payment": pd.Series(dtype=float), "max_transaction": {"Amount": 0.0}, "min_transaction": {"Amount": 0.0}, "monthly": pd.DataFrame(columns=["Month", "Amount"]), "category_share": pd.Series(dtype=float), "rolling_28d": pd.DataFrame(columns=["Date", "Amount", "Rolling28"]), "spikes": pd.DataFrame(columns=["Date", "Amount", "IsSpike"]), } total_spend = float(df["Amount"].sum()) spend_per_category = df.groupby("Category")["Amount"].sum().sort_values(ascending=False) spend_per_payment = df.groupby("Payment Method")["Amount"].sum().sort_values(ascending=False) max_txn = df.loc[df["Amount"].idxmax()].to_dict() min_txn = df.loc[df["Amount"].idxmin()].to_dict() monthly = df.assign(Month=_month_key(df["Date"])).groupby("Month")["Amount"].sum().reset_index() avg_monthly_spend = float(monthly["Amount"].mean()) if not monthly.empty else 0.0 category_share = (spend_per_category / max(total_spend, 1e-9)).round(4) df_daily = df.groupby(pd.to_datetime(df["Date"]).dt.date)["Amount"].sum().reset_index() df_daily["Date"] = pd.to_datetime(df_daily["Date"]) df_daily = df_daily.sort_values("Date") df_daily["Rolling28"] = df_daily["Amount"].rolling(window=28, min_periods=7).mean() mu = df_daily["Amount"].mean() sigma = df_daily["Amount"].std(ddof=0) or 0.0 threshold = mu + 2.5 * sigma df_spikes = df_daily.assign(IsSpike=df_daily["Amount"] > threshold) return { "total_spend": total_spend, "avg_monthly_spend": avg_monthly_spend, "spend_per_category": spend_per_category, "spend_per_payment": spend_per_payment, "max_transaction": max_txn, "min_transaction": min_txn, "monthly": monthly, "category_share": category_share, "rolling_28d": df_daily, "spikes": df_spikes, } # ------------------------------ # Charts # ------------------------------ def build_time_series_chart( df: pd.DataFrame, template: str = "plotly", spike_overlay: Optional[pd.DataFrame] = None, fixed_line_width: float = 2.0, **kwargs, ) -> "px.Figure": if df.empty: fig = px.line() fig.update_layout(template=template) return fig daily = df.groupby(pd.to_datetime(df["Date"]).dt.date)["Amount"].sum().reset_index() daily["Date"] = pd.to_datetime(daily["Date"]) fig = px.line(daily, x="Date", y="Amount", title="Daily Spend Over Time", markers=True) fig.update_traces(line=dict(width=fixed_line_width), hovertemplate="%{x|%b %d, %Y}: £%{y:.2f}") fig.update_layout( margin=dict(l=10, r=10, t=40, b=10), template=template, xaxis=dict(fixedrange=True), yaxis=dict(fixedrange=True), ) if isinstance(spike_overlay, pd.DataFrame) and not spike_overlay.empty: spike_points = spike_overlay[spike_overlay.get("IsSpike", False)] if not spike_points.empty: fig.add_scatter( x=spike_points["Date"], y=spike_points["Amount"], mode="markers", name="Spikes", marker=dict(color="#EF553B", size=9, symbol="diamond"), hovertemplate="Spike %{x|%b %d, %Y}: £%{y:.2f}", ) return fig def build_category_bar_chart( spend_per_category: pd.Series, template: str = "plotly", color_sequence: Optional[list] = None, **kwargs, ): if spend_per_category.empty: fig = px.bar() fig.update_layout(template=template) return fig fig = px.bar( spend_per_category.reset_index().rename(columns={"index": "Category", 0: "Amount"}), x="Category", y="Amount", title="Spend by Category", color="Category", color_discrete_sequence=color_sequence, ) fig.update_traces(hovertemplate="%{x}: £%{y:.2f}") fig.update_layout(showlegend=False, margin=dict(l=10, r=10, t=40, b=10), template=template) return fig def build_payment_method_pie_chart( spend_per_payment: pd.Series, template: str = "plotly", color_sequence: Optional[list] = None, ): if spend_per_payment.empty: fig = px.pie() fig.update_layout(template=template) return fig fig = px.pie( spend_per_payment.reset_index().rename(columns={"index": "Payment Method", 0: "Amount"}), values="Amount", names="Payment Method", title="Payment Methods Distribution", hole=0.45, color_discrete_sequence=color_sequence, ) fig.update_traces(hovertemplate="%{label}: £%{value:.2f} (%{percent})") fig.update_layout(margin=dict(l=10, r=10, t=40, b=10), template=template) return fig # ------------------------------ # Helpers # ------------------------------ def _format_number(n: float) -> str: """Formats numbers with k/M suffix and ensures spacing to prevent cramping.""" if n >= 1_000_000: return f"£{n/1_000_000:.1f}M" if n >= 1_000: return f"£{n/1_000:.1f}k" return f"£{n:,.0f}" def _month_over_month_change(monthly: Optional[pd.DataFrame]) -> float: if monthly is None or monthly.empty or len(monthly) < 2: return 0.0 monthly_sorted = monthly.sort_values("Month") last, prev = monthly_sorted["Amount"].iloc[-1], monthly_sorted["Amount"].iloc[-2] if prev == 0: return 0.0 return float((last - prev) / prev)