"""Core duplicate-detection logic.""" from __future__ import annotations import logging from collections import defaultdict, deque from datetime import datetime from decimal import Decimal from typing import Dict, Iterable, List, Sequence, Set, Tuple from .config import settings from .merchant_alias import MerchantAliasResolver, normalize_merchant from .models import DuplicateCluster, Expense, MergeSuggestion from .repositories import MergeSuggestionRepository logger = logging.getLogger("DuplicateDetector") def _pct_delta(a: Decimal, b: Decimal) -> float: if a == 0: return float("inf") return abs(float((a - b) / a * Decimal(100))) def _minutes_delta(a: datetime, b: datetime) -> float: return abs((a - b).total_seconds() / 60) class DuplicateDetector: def __init__( self, *, alias_resolver: MerchantAliasResolver, suggestions_repo: MergeSuggestionRepository, amount_tolerance_pct: float | None = None, time_tolerance_minutes: int | None = None, ) -> None: self.alias_resolver = alias_resolver self.suggestions_repo = suggestions_repo self.amount_tolerance_pct = amount_tolerance_pct or float(settings.amount_tolerance_pct) self.time_tolerance_minutes = time_tolerance_minutes or settings.time_tolerance_minutes def _build_graph(self, expenses: Sequence[Expense]) -> Dict[int, Set[int]]: adjacency: Dict[int, Set[int]] = defaultdict(set) for i, exp_a in enumerate(expenses): for j in range(i + 1, len(expenses)): exp_b = expenses[j] if exp_a.user_id and exp_b.user_id and exp_a.user_id != exp_b.user_id: continue delta_minutes = _minutes_delta(exp_a.expense_time, exp_b.expense_time) if delta_minutes > self.time_tolerance_minutes: break amount_delta_pct = _pct_delta(exp_a.amount, exp_b.amount) if amount_delta_pct > self.amount_tolerance_pct: continue alias_match, alias_rule = self.alias_resolver.are_aliases( exp_a.merchant, exp_b.merchant, ) if alias_match: adjacency[i].add(j) adjacency[j].add(i) return adjacency def _clusters_from_graph( self, adjacency: Dict[int, Set[int]], expenses: Sequence[Expense], ) -> List[DuplicateCluster]: visited: Set[int] = set() clusters: List[DuplicateCluster] = [] for node in range(len(expenses)): if node in visited or node not in adjacency: continue component_nodes: List[int] = [] queue: deque[int] = deque([node]) while queue: current = queue.popleft() if current in visited: continue visited.add(current) component_nodes.append(current) for neighbor in adjacency[current]: if neighbor not in visited: queue.append(neighbor) if len(component_nodes) <= 1: continue component_nodes.sort() component_expenses = [expenses[idx] for idx in component_nodes] amounts = [exp.amount for exp in component_expenses] times = [exp.expense_time for exp in component_expenses] amount_delta_pct = _pct_delta(min(amounts), max(amounts)) time_delta_minutes = _minutes_delta(min(times), max(times)) merchant_rule = self._merchant_rule(component_expenses) clusters.append( DuplicateCluster( expenses=component_expenses, amount_delta_pct=amount_delta_pct, time_delta_minutes=time_delta_minutes, merchant_rule=merchant_rule, ), ) return clusters def _merchant_rule(self, expenses: Sequence[Expense]) -> str: normalized = {normalize_merchant(exp.merchant) for exp in expenses} if len(normalized) == 1: return "exact" return "alias" def find_clusters(self, expenses: Sequence[Expense]) -> List[DuplicateCluster]: if not expenses: return [] sorted_expenses = sorted(expenses, key=lambda e: e.expense_time) graph = self._build_graph(sorted_expenses) clusters = self._clusters_from_graph(graph, sorted_expenses) logger.info("Evaluated %d expenses, found %d clusters", len(expenses), len(clusters)) return clusters def persist_suggestions(self, clusters: Iterable[DuplicateCluster]) -> List[str]: suggestion_ids: List[str] = [] for cluster in clusters: candidate_ids = [expense.expense_id for expense in cluster.expenses] tie_breaker = "same purchase?" if len(candidate_ids) > 2 else None details = cluster.to_details() if tie_breaker: details["tie_breaker"] = tie_breaker suggestion = MergeSuggestion( candidate_ids=candidate_ids, message="These seem similar. Would you like to merge them?", details=details, audit={ "generated_by": settings.service_name, "generated_at": datetime.utcnow(), "rule_version": "v1.0", }, ) suggestion_id = self.suggestions_repo.insert_soft_merge(suggestion) suggestion_ids.append(suggestion_id) logger.info( "Recorded merge suggestion %s for candidates %s", suggestion_id, candidate_ids, ) return suggestion_ids