DUPLICATE_TRANSACTION_DETECTION / src /duplicate_detector.py
LogicGoInfotechSpaces's picture
Improve detector schema + add API tester
76af018
"""Core duplicate-detection logic."""
from __future__ import annotations
import logging
from collections import defaultdict, deque
from datetime import datetime
from decimal import Decimal
from typing import Dict, Iterable, List, Sequence, Set, Tuple
from .config import settings
from .merchant_alias import MerchantAliasResolver, normalize_merchant
from .models import DuplicateCluster, Expense, MergeSuggestion
from .repositories import MergeSuggestionRepository
logger = logging.getLogger("DuplicateDetector")
def _pct_delta(a: Decimal, b: Decimal) -> float:
if a == 0:
return float("inf")
return abs(float((a - b) / a * Decimal(100)))
def _minutes_delta(a: datetime, b: datetime) -> float:
return abs((a - b).total_seconds() / 60)
class DuplicateDetector:
def __init__(
self,
*,
alias_resolver: MerchantAliasResolver,
suggestions_repo: MergeSuggestionRepository,
amount_tolerance_pct: float | None = None,
time_tolerance_minutes: int | None = None,
) -> None:
self.alias_resolver = alias_resolver
self.suggestions_repo = suggestions_repo
self.amount_tolerance_pct = amount_tolerance_pct or float(settings.amount_tolerance_pct)
self.time_tolerance_minutes = time_tolerance_minutes or settings.time_tolerance_minutes
def _build_graph(self, expenses: Sequence[Expense]) -> Dict[int, Set[int]]:
adjacency: Dict[int, Set[int]] = defaultdict(set)
for i, exp_a in enumerate(expenses):
for j in range(i + 1, len(expenses)):
exp_b = expenses[j]
if exp_a.user_id and exp_b.user_id and exp_a.user_id != exp_b.user_id:
continue
delta_minutes = _minutes_delta(exp_a.expense_time, exp_b.expense_time)
if delta_minutes > self.time_tolerance_minutes:
break
amount_delta_pct = _pct_delta(exp_a.amount, exp_b.amount)
if amount_delta_pct > self.amount_tolerance_pct:
continue
alias_match, alias_rule = self.alias_resolver.are_aliases(
exp_a.merchant,
exp_b.merchant,
)
if alias_match:
adjacency[i].add(j)
adjacency[j].add(i)
return adjacency
def _clusters_from_graph(
self,
adjacency: Dict[int, Set[int]],
expenses: Sequence[Expense],
) -> List[DuplicateCluster]:
visited: Set[int] = set()
clusters: List[DuplicateCluster] = []
for node in range(len(expenses)):
if node in visited or node not in adjacency:
continue
component_nodes: List[int] = []
queue: deque[int] = deque([node])
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
component_nodes.append(current)
for neighbor in adjacency[current]:
if neighbor not in visited:
queue.append(neighbor)
if len(component_nodes) <= 1:
continue
component_nodes.sort()
component_expenses = [expenses[idx] for idx in component_nodes]
amounts = [exp.amount for exp in component_expenses]
times = [exp.expense_time for exp in component_expenses]
amount_delta_pct = _pct_delta(min(amounts), max(amounts))
time_delta_minutes = _minutes_delta(min(times), max(times))
merchant_rule = self._merchant_rule(component_expenses)
clusters.append(
DuplicateCluster(
expenses=component_expenses,
amount_delta_pct=amount_delta_pct,
time_delta_minutes=time_delta_minutes,
merchant_rule=merchant_rule,
),
)
return clusters
def _merchant_rule(self, expenses: Sequence[Expense]) -> str:
normalized = {normalize_merchant(exp.merchant) for exp in expenses}
if len(normalized) == 1:
return "exact"
return "alias"
def find_clusters(self, expenses: Sequence[Expense]) -> List[DuplicateCluster]:
if not expenses:
return []
sorted_expenses = sorted(expenses, key=lambda e: e.expense_time)
graph = self._build_graph(sorted_expenses)
clusters = self._clusters_from_graph(graph, sorted_expenses)
logger.info("Evaluated %d expenses, found %d clusters", len(expenses), len(clusters))
return clusters
def persist_suggestions(self, clusters: Iterable[DuplicateCluster]) -> List[str]:
suggestion_ids: List[str] = []
for cluster in clusters:
candidate_ids = [expense.expense_id for expense in cluster.expenses]
tie_breaker = "same purchase?" if len(candidate_ids) > 2 else None
details = cluster.to_details()
if tie_breaker:
details["tie_breaker"] = tie_breaker
suggestion = MergeSuggestion(
candidate_ids=candidate_ids,
message="These seem similar. Would you like to merge them?",
details=details,
audit={
"generated_by": settings.service_name,
"generated_at": datetime.utcnow(),
"rule_version": "v1.0",
},
)
suggestion_id = self.suggestions_repo.insert_soft_merge(suggestion)
suggestion_ids.append(suggestion_id)
logger.info(
"Recorded merge suggestion %s for candidates %s",
suggestion_id,
candidate_ids,
)
return suggestion_ids