| """Core duplicate-detection logic.""" | |
| from __future__ import annotations | |
| import logging | |
| from collections import defaultdict, deque | |
| from datetime import datetime | |
| from decimal import Decimal | |
| from typing import Dict, Iterable, List, Sequence, Set, Tuple | |
| from .config import settings | |
| from .merchant_alias import MerchantAliasResolver, normalize_merchant | |
| from .models import DuplicateCluster, Expense, MergeSuggestion | |
| from .repositories import MergeSuggestionRepository | |
| logger = logging.getLogger("DuplicateDetector") | |
| def _pct_delta(a: Decimal, b: Decimal) -> float: | |
| if a == 0: | |
| return float("inf") | |
| return abs(float((a - b) / a * Decimal(100))) | |
| def _minutes_delta(a: datetime, b: datetime) -> float: | |
| return abs((a - b).total_seconds() / 60) | |
| class DuplicateDetector: | |
| def __init__( | |
| self, | |
| *, | |
| alias_resolver: MerchantAliasResolver, | |
| suggestions_repo: MergeSuggestionRepository, | |
| amount_tolerance_pct: float | None = None, | |
| time_tolerance_minutes: int | None = None, | |
| ) -> None: | |
| self.alias_resolver = alias_resolver | |
| self.suggestions_repo = suggestions_repo | |
| self.amount_tolerance_pct = amount_tolerance_pct or float(settings.amount_tolerance_pct) | |
| self.time_tolerance_minutes = time_tolerance_minutes or settings.time_tolerance_minutes | |
| def _build_graph(self, expenses: Sequence[Expense]) -> Dict[int, Set[int]]: | |
| adjacency: Dict[int, Set[int]] = defaultdict(set) | |
| for i, exp_a in enumerate(expenses): | |
| for j in range(i + 1, len(expenses)): | |
| exp_b = expenses[j] | |
| if exp_a.user_id and exp_b.user_id and exp_a.user_id != exp_b.user_id: | |
| continue | |
| delta_minutes = _minutes_delta(exp_a.expense_time, exp_b.expense_time) | |
| if delta_minutes > self.time_tolerance_minutes: | |
| break | |
| amount_delta_pct = _pct_delta(exp_a.amount, exp_b.amount) | |
| if amount_delta_pct > self.amount_tolerance_pct: | |
| continue | |
| alias_match, alias_rule = self.alias_resolver.are_aliases( | |
| exp_a.merchant, | |
| exp_b.merchant, | |
| ) | |
| if alias_match: | |
| adjacency[i].add(j) | |
| adjacency[j].add(i) | |
| return adjacency | |
| def _clusters_from_graph( | |
| self, | |
| adjacency: Dict[int, Set[int]], | |
| expenses: Sequence[Expense], | |
| ) -> List[DuplicateCluster]: | |
| visited: Set[int] = set() | |
| clusters: List[DuplicateCluster] = [] | |
| for node in range(len(expenses)): | |
| if node in visited or node not in adjacency: | |
| continue | |
| component_nodes: List[int] = [] | |
| queue: deque[int] = deque([node]) | |
| while queue: | |
| current = queue.popleft() | |
| if current in visited: | |
| continue | |
| visited.add(current) | |
| component_nodes.append(current) | |
| for neighbor in adjacency[current]: | |
| if neighbor not in visited: | |
| queue.append(neighbor) | |
| if len(component_nodes) <= 1: | |
| continue | |
| component_nodes.sort() | |
| component_expenses = [expenses[idx] for idx in component_nodes] | |
| amounts = [exp.amount for exp in component_expenses] | |
| times = [exp.expense_time for exp in component_expenses] | |
| amount_delta_pct = _pct_delta(min(amounts), max(amounts)) | |
| time_delta_minutes = _minutes_delta(min(times), max(times)) | |
| merchant_rule = self._merchant_rule(component_expenses) | |
| clusters.append( | |
| DuplicateCluster( | |
| expenses=component_expenses, | |
| amount_delta_pct=amount_delta_pct, | |
| time_delta_minutes=time_delta_minutes, | |
| merchant_rule=merchant_rule, | |
| ), | |
| ) | |
| return clusters | |
| def _merchant_rule(self, expenses: Sequence[Expense]) -> str: | |
| normalized = {normalize_merchant(exp.merchant) for exp in expenses} | |
| if len(normalized) == 1: | |
| return "exact" | |
| return "alias" | |
| def find_clusters(self, expenses: Sequence[Expense]) -> List[DuplicateCluster]: | |
| if not expenses: | |
| return [] | |
| sorted_expenses = sorted(expenses, key=lambda e: e.expense_time) | |
| graph = self._build_graph(sorted_expenses) | |
| clusters = self._clusters_from_graph(graph, sorted_expenses) | |
| logger.info("Evaluated %d expenses, found %d clusters", len(expenses), len(clusters)) | |
| return clusters | |
| def persist_suggestions(self, clusters: Iterable[DuplicateCluster]) -> List[str]: | |
| suggestion_ids: List[str] = [] | |
| for cluster in clusters: | |
| candidate_ids = [expense.expense_id for expense in cluster.expenses] | |
| tie_breaker = "same purchase?" if len(candidate_ids) > 2 else None | |
| details = cluster.to_details() | |
| if tie_breaker: | |
| details["tie_breaker"] = tie_breaker | |
| suggestion = MergeSuggestion( | |
| candidate_ids=candidate_ids, | |
| message="These seem similar. Would you like to merge them?", | |
| details=details, | |
| audit={ | |
| "generated_by": settings.service_name, | |
| "generated_at": datetime.utcnow(), | |
| "rule_version": "v1.0", | |
| }, | |
| ) | |
| suggestion_id = self.suggestions_repo.insert_soft_merge(suggestion) | |
| suggestion_ids.append(suggestion_id) | |
| logger.info( | |
| "Recorded merge suggestion %s for candidates %s", | |
| suggestion_id, | |
| candidate_ids, | |
| ) | |
| return suggestion_ids | |