LogicGoInfotechSpaces commited on
Commit
e28a7b2
·
0 Parent(s):

Initial duplicate detector

Browse files
Dockerfile ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.11-slim
2
+
3
+ ENV PYTHONDONTWRITEBYTECODE=1 \
4
+ PYTHONUNBUFFERED=1
5
+
6
+ WORKDIR /app
7
+
8
+ COPY requirements.txt .
9
+ RUN python3 -m pip install --no-cache-dir -r requirements.txt
10
+
11
+ COPY . .
12
+
13
+ EXPOSE 7860
14
+
15
+ CMD ["python3", "-m", "uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "7860"]
16
+
README.md ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ Auto Expense Categorization – Duplicate Detection
2
+ =================================================
3
+
4
+ This mini-service connects to the `expense` MongoDB database and surfaces *soft* merge suggestions whenever two or more expense entries look like the same purchase. The rules currently implemented are the ones requested:
5
+
6
+ * Amount difference no more than ±1 %
7
+ * Timestamp difference within a configurable ±N minutes window (default: 10 min)
8
+ * Merchant names that are either identical once normalised or mapped through a merchant-alias table
9
+
10
+ Instead of destroying or editing any expense rows, the service writes a merge suggestion into the `merge_suggestions` collection so that an operator (or another automation) can perform the actual merge later on.
11
+
12
+ Quick Start
13
+ -----------
14
+
15
+ 1. Create a virtual environment and install dependencies:
16
+
17
+ ```
18
+ python3 -m venv .venv
19
+ .\.venv\Scripts\activate
20
+ python3 -m pip install -r requirements.txt
21
+ ```
22
+
23
+ 2. Copy `.env.example` to `.env` and set the Mongo connection string if you do not want to rely on the baked-in default.
24
+
25
+ 3. Run the detector (the default config scans the last 48 h of data and writes suggestions only):
26
+
27
+ ```
28
+ python3 -m src.main --minutes 12 --lookback-hours 72
29
+ ```
30
+
31
+ You will see log lines such as:
32
+
33
+ ```
34
+ INFO DuplicateDetector Identified 2 duplicates, suggestion 673a...
35
+ ```
36
+
37
+ API Server
38
+ ----------
39
+
40
+ Run the HTTP service with FastAPI/uvicorn:
41
+
42
+ ```
43
+ python3 -m uvicorn src.api:app --reload
44
+ ```
45
+
46
+ Endpoints:
47
+
48
+ * `GET /health` – readiness probe.
49
+ * `POST /duplicates/detect` – kicks off a scan (body can override `lookback_hours`, `limit`, `amount_pct`, `minutes`).
50
+ * `GET /suggestions?limit=50` – lists recent merge suggestions so the UI can ask “These seem similar. Would you like to merge them?”.
51
+
52
+ Collections
53
+ -----------
54
+
55
+ * `expenses`: source data. The detector expects fields such as `_id`, `amount`, `currency`, `merchant`, `expense_time`.
56
+ * `merchant_aliases`: optional alias definitions (`name`, `aliases`).
57
+ * `merge_suggestions`: the service writes documents shaped as:
58
+
59
+ ```
60
+ {
61
+ "_id": ObjectId(...),
62
+ "candidate_ids": [...],
63
+ "message": "These seem similar. Would you like to merge them?",
64
+ "details": {
65
+ "amount_delta_pct": 0.53,
66
+ "time_delta_minutes": 4.2,
67
+ "merchant_match_rule": "alias"
68
+ },
69
+ "audit": {
70
+ "generated_by": "duplicate-detector",
71
+ "generated_at": ISODate(...)
72
+ },
73
+ "status": "pending"
74
+ }
75
+ ```
76
+
77
+ Configuration
78
+ -------------
79
+
80
+ All tunables live in `src/config.py`. Environment variables take precedence, so you can tune tolerances per deployment without editing code.
81
+
82
+ | Variable | Description | Default |
83
+ | --- | --- | --- |
84
+ | `MONGO_URI` | Mongo connection string | Provided URI |
85
+ | `MONGO_DB` | Database name | `expense` |
86
+ | `MONGO_EXPENSE_COLLECTION` | Expenses collection | `expenses` |
87
+ | `MONGO_ALIAS_COLLECTION` | Merchant alias collection | `merchant_aliases` |
88
+ | `MONGO_SUGGESTION_COLLECTION` | Merge-suggestion collection | `merge_suggestions` |
89
+ | `AMOUNT_TOLERANCE_PCT` | Amount delta percentage | `1.0` |
90
+ | `TIME_TOLERANCE_MINUTES` | Time delta minutes | `10` |
91
+ | `DEFAULT_LOOKBACK_HOURS` | How far back to scan | `48` |
92
+
93
+ Next Steps
94
+ ----------
95
+
96
+ * Wire this module into your ingestion pipeline so suggestions are generated immediately after a new expense is stored.
97
+ * Surface the `merge_suggestions` collection in your UI to show prompts such as “These seem similar. Would you like to merge them?”
98
+ * Extend `MerchantAliasResolver` to sync aliases from your upstream ERP or ML model.
99
+
100
+
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ pymongo>=4.8.0
2
+ python-dotenv>=1.0.1
3
+ fastapi>=0.115.0
4
+ uvicorn>=0.30.6
5
+
src/__init__.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ """Duplicate detection service package."""
2
+
3
+ __all__ = [
4
+ "config",
5
+ "models",
6
+ "merchant_alias",
7
+ "repositories",
8
+ "duplicate_detector",
9
+ ]
10
+
src/api.py ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from typing import Any, List
5
+
6
+ from fastapi import FastAPI, HTTPException, Query
7
+ from pydantic import BaseModel, ConfigDict, Field
8
+
9
+ from .config import settings
10
+ from .duplicate_detector import DuplicateDetector
11
+ from .merchant_alias import MerchantAliasResolver
12
+ from .repositories import (
13
+ ExpenseRepository,
14
+ MerchantAliasRepository,
15
+ MergeSuggestionRepository,
16
+ build_client,
17
+ )
18
+
19
+ logger = logging.getLogger("DuplicateAPI")
20
+
21
+ app = FastAPI(
22
+ title="Expense Duplicate Service",
23
+ version="1.0.0",
24
+ description="Detects near-duplicate expense entries and proposes merge suggestions.",
25
+ )
26
+
27
+ mongo_client = build_client()
28
+ expense_repository = ExpenseRepository.from_client(mongo_client)
29
+ alias_repository = MerchantAliasRepository.from_client(mongo_client)
30
+ suggestion_repository = MergeSuggestionRepository.from_client(mongo_client)
31
+
32
+
33
+ class DetectRequest(BaseModel):
34
+ lookback_hours: int | None = Field(
35
+ default=None,
36
+ ge=1,
37
+ description="Hours to look back when scanning expenses.",
38
+ )
39
+ limit: int | None = Field(
40
+ default=None,
41
+ ge=1,
42
+ le=settings.max_batch_size,
43
+ description="Maximum number of expenses to evaluate.",
44
+ )
45
+ amount_pct: float | None = Field(
46
+ default=None,
47
+ gt=0,
48
+ description="Override amount tolerance percentage (default 1%%).",
49
+ )
50
+ minutes: int | None = Field(
51
+ default=None,
52
+ gt=0,
53
+ description="Override time tolerance minutes (default 10).",
54
+ )
55
+
56
+
57
+ class DetectResponse(BaseModel):
58
+ expenses_scanned: int
59
+ cluster_count: int
60
+ suggestion_ids: List[str]
61
+ message: str
62
+
63
+
64
+ class Suggestion(BaseModel):
65
+ model_config = ConfigDict(populate_by_name=True)
66
+
67
+ id: str = Field(alias="_id")
68
+ candidate_ids: List[str]
69
+ message: str
70
+ details: dict[str, Any]
71
+ audit: dict[str, Any]
72
+ status: str
73
+
74
+
75
+ @app.get("/health")
76
+ async def health() -> dict[str, str]:
77
+ return {"status": "ok"}
78
+
79
+
80
+ @app.post("/duplicates/detect", response_model=DetectResponse)
81
+ async def detect_duplicates(payload: DetectRequest) -> DetectResponse:
82
+ lookback_hours = payload.lookback_hours or settings.default_lookback_hours
83
+ limit = payload.limit or settings.max_batch_size
84
+ amount_pct = payload.amount_pct or float(settings.amount_tolerance_pct)
85
+ minutes = payload.minutes or settings.time_tolerance_minutes
86
+
87
+ alias_resolver = MerchantAliasResolver()
88
+ alias_resolver.load_from_cursor(alias_repository.fetch_all())
89
+ detector = DuplicateDetector(
90
+ alias_resolver=alias_resolver,
91
+ suggestions_repo=suggestion_repository,
92
+ amount_tolerance_pct=amount_pct,
93
+ time_tolerance_minutes=minutes,
94
+ )
95
+
96
+ expenses = expense_repository.fetch_recent(lookback_hours, limit)
97
+ if not expenses:
98
+ return DetectResponse(
99
+ expenses_scanned=0,
100
+ cluster_count=0,
101
+ suggestion_ids=[],
102
+ message="No expenses found for the requested window.",
103
+ )
104
+
105
+ clusters = detector.find_clusters(expenses)
106
+ suggestion_ids = detector.persist_suggestions(clusters) if clusters else []
107
+ message = (
108
+ "These seem similar. Would you like to merge them?"
109
+ if suggestion_ids
110
+ else "No duplicate clusters detected."
111
+ )
112
+ return DetectResponse(
113
+ expenses_scanned=len(expenses),
114
+ cluster_count=len(clusters),
115
+ suggestion_ids=suggestion_ids,
116
+ message=message,
117
+ )
118
+
119
+
120
+ @app.get("/suggestions", response_model=List[Suggestion])
121
+ async def list_suggestions(limit: int = Query(50, ge=1, le=500)) -> List[Suggestion]:
122
+ docs = suggestion_repository.fetch_recent(limit)
123
+ return [Suggestion.model_validate(doc) for doc in docs]
124
+
src/config.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Centralised configuration for the duplicate detector."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import os
6
+ from dataclasses import dataclass
7
+ from decimal import Decimal
8
+
9
+ from dotenv import load_dotenv
10
+
11
+ load_dotenv()
12
+
13
+ DEFAULT_MONGO_URI = (
14
+ "mongodb://expenseuser:Kem_6o%3F%3F@165.227.69.221:27017/expense?authSource=admin"
15
+ )
16
+
17
+
18
+ def _get_decimal(env_key: str, default: str) -> Decimal:
19
+ raw_value = os.getenv(env_key, default)
20
+ try:
21
+ return Decimal(raw_value)
22
+ except Exception as exc: # pragma: no cover - defensive logging
23
+ raise ValueError(f"Invalid decimal for {env_key}: {raw_value}") from exc
24
+
25
+
26
+ def _get_int(env_key: str, default: str) -> int:
27
+ raw_value = os.getenv(env_key, default)
28
+ try:
29
+ return int(raw_value)
30
+ except Exception as exc: # pragma: no cover - defensive logging
31
+ raise ValueError(f"Invalid int for {env_key}: {raw_value}") from exc
32
+
33
+
34
+ @dataclass(frozen=True)
35
+ class Settings:
36
+ mongo_uri: str = os.getenv("MONGO_URI", DEFAULT_MONGO_URI)
37
+ mongo_db: str = os.getenv("MONGO_DB", "expense")
38
+ expense_collection: str = os.getenv("MONGO_EXPENSE_COLLECTION", "expenses")
39
+ alias_collection: str = os.getenv("MONGO_ALIAS_COLLECTION", "merchant_aliases")
40
+ suggestion_collection: str = os.getenv("MONGO_SUGGESTION_COLLECTION", "merge_suggestions")
41
+ amount_tolerance_pct: Decimal = _get_decimal("AMOUNT_TOLERANCE_PCT", "1.0")
42
+ time_tolerance_minutes: int = _get_int("TIME_TOLERANCE_MINUTES", "10")
43
+ default_lookback_hours: int = _get_int("DEFAULT_LOOKBACK_HOURS", "48")
44
+ service_name: str = os.getenv("SERVICE_NAME", "duplicate-detector")
45
+ max_batch_size: int = _get_int("MAX_BATCH_SIZE", "5000")
46
+
47
+
48
+ settings = Settings()
49
+
src/duplicate_detector.py ADDED
@@ -0,0 +1,144 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Core duplicate-detection logic."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import logging
6
+ from collections import defaultdict, deque
7
+ from datetime import datetime
8
+ from decimal import Decimal
9
+ from typing import Dict, Iterable, List, Sequence, Set, Tuple
10
+
11
+ from .config import settings
12
+ from .merchant_alias import MerchantAliasResolver, normalize_merchant
13
+ from .models import DuplicateCluster, Expense, MergeSuggestion
14
+ from .repositories import MergeSuggestionRepository
15
+
16
+ logger = logging.getLogger("DuplicateDetector")
17
+
18
+
19
+ def _pct_delta(a: Decimal, b: Decimal) -> float:
20
+ if a == 0:
21
+ return float("inf")
22
+ return abs(float((a - b) / a * Decimal(100)))
23
+
24
+
25
+ def _minutes_delta(a: datetime, b: datetime) -> float:
26
+ return abs((a - b).total_seconds() / 60)
27
+
28
+
29
+ class DuplicateDetector:
30
+ def __init__(
31
+ self,
32
+ *,
33
+ alias_resolver: MerchantAliasResolver,
34
+ suggestions_repo: MergeSuggestionRepository,
35
+ amount_tolerance_pct: float | None = None,
36
+ time_tolerance_minutes: int | None = None,
37
+ ) -> None:
38
+ self.alias_resolver = alias_resolver
39
+ self.suggestions_repo = suggestions_repo
40
+ self.amount_tolerance_pct = amount_tolerance_pct or float(settings.amount_tolerance_pct)
41
+ self.time_tolerance_minutes = time_tolerance_minutes or settings.time_tolerance_minutes
42
+
43
+ def _build_graph(self, expenses: Sequence[Expense]) -> Dict[int, Set[int]]:
44
+ adjacency: Dict[int, Set[int]] = defaultdict(set)
45
+ for i, exp_a in enumerate(expenses):
46
+ for j in range(i + 1, len(expenses)):
47
+ exp_b = expenses[j]
48
+ delta_minutes = _minutes_delta(exp_a.expense_time, exp_b.expense_time)
49
+ if delta_minutes > self.time_tolerance_minutes:
50
+ break
51
+ amount_delta_pct = _pct_delta(exp_a.amount, exp_b.amount)
52
+ if amount_delta_pct > self.amount_tolerance_pct:
53
+ continue
54
+ alias_match, alias_rule = self.alias_resolver.are_aliases(
55
+ exp_a.merchant,
56
+ exp_b.merchant,
57
+ )
58
+ if alias_match:
59
+ adjacency[i].add(j)
60
+ adjacency[j].add(i)
61
+ return adjacency
62
+
63
+ def _clusters_from_graph(
64
+ self,
65
+ adjacency: Dict[int, Set[int]],
66
+ expenses: Sequence[Expense],
67
+ ) -> List[DuplicateCluster]:
68
+ visited: Set[int] = set()
69
+ clusters: List[DuplicateCluster] = []
70
+ for node in range(len(expenses)):
71
+ if node in visited or node not in adjacency:
72
+ continue
73
+ component_nodes: List[int] = []
74
+ queue: deque[int] = deque([node])
75
+ while queue:
76
+ current = queue.popleft()
77
+ if current in visited:
78
+ continue
79
+ visited.add(current)
80
+ component_nodes.append(current)
81
+ for neighbor in adjacency[current]:
82
+ if neighbor not in visited:
83
+ queue.append(neighbor)
84
+ if len(component_nodes) <= 1:
85
+ continue
86
+ component_nodes.sort()
87
+ component_expenses = [expenses[idx] for idx in component_nodes]
88
+ amounts = [exp.amount for exp in component_expenses]
89
+ times = [exp.expense_time for exp in component_expenses]
90
+ amount_delta_pct = _pct_delta(min(amounts), max(amounts))
91
+ time_delta_minutes = _minutes_delta(min(times), max(times))
92
+ merchant_rule = self._merchant_rule(component_expenses)
93
+ clusters.append(
94
+ DuplicateCluster(
95
+ expenses=component_expenses,
96
+ amount_delta_pct=amount_delta_pct,
97
+ time_delta_minutes=time_delta_minutes,
98
+ merchant_rule=merchant_rule,
99
+ ),
100
+ )
101
+ return clusters
102
+
103
+ def _merchant_rule(self, expenses: Sequence[Expense]) -> str:
104
+ normalized = {normalize_merchant(exp.merchant) for exp in expenses}
105
+ if len(normalized) == 1:
106
+ return "exact"
107
+ return "alias"
108
+
109
+ def find_clusters(self, expenses: Sequence[Expense]) -> List[DuplicateCluster]:
110
+ if not expenses:
111
+ return []
112
+ sorted_expenses = sorted(expenses, key=lambda e: e.expense_time)
113
+ graph = self._build_graph(sorted_expenses)
114
+ clusters = self._clusters_from_graph(graph, sorted_expenses)
115
+ logger.info("Evaluated %d expenses, found %d clusters", len(expenses), len(clusters))
116
+ return clusters
117
+
118
+ def persist_suggestions(self, clusters: Iterable[DuplicateCluster]) -> List[str]:
119
+ suggestion_ids: List[str] = []
120
+ for cluster in clusters:
121
+ candidate_ids = [expense.expense_id for expense in cluster.expenses]
122
+ tie_breaker = "same purchase?" if len(candidate_ids) > 2 else None
123
+ details = cluster.to_details()
124
+ if tie_breaker:
125
+ details["tie_breaker"] = tie_breaker
126
+ suggestion = MergeSuggestion(
127
+ candidate_ids=candidate_ids,
128
+ message="These seem similar. Would you like to merge them?",
129
+ details=details,
130
+ audit={
131
+ "generated_by": settings.service_name,
132
+ "generated_at": datetime.utcnow(),
133
+ "rule_version": "v1.0",
134
+ },
135
+ )
136
+ suggestion_id = self.suggestions_repo.insert_soft_merge(suggestion)
137
+ suggestion_ids.append(suggestion_id)
138
+ logger.info(
139
+ "Recorded merge suggestion %s for candidates %s",
140
+ suggestion_id,
141
+ candidate_ids,
142
+ )
143
+ return suggestion_ids
144
+
src/main.py ADDED
@@ -0,0 +1,106 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import argparse
4
+ import logging
5
+ import sys
6
+
7
+ from .config import settings
8
+ from .duplicate_detector import DuplicateDetector
9
+ from .merchant_alias import MerchantAliasResolver
10
+ from .repositories import (
11
+ ExpenseRepository,
12
+ MerchantAliasRepository,
13
+ MergeSuggestionRepository,
14
+ build_client,
15
+ )
16
+
17
+
18
+ def configure_logging(verbose: bool) -> None:
19
+ level = logging.DEBUG if verbose else logging.INFO
20
+ logging.basicConfig(
21
+ level=level,
22
+ format="%(asctime)s %(levelname)s %(message)s",
23
+ )
24
+
25
+
26
+ def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
27
+ parser = argparse.ArgumentParser(
28
+ description="Detect near-duplicate expenses and write merge suggestions.",
29
+ )
30
+ parser.add_argument(
31
+ "--minutes",
32
+ dest="minutes",
33
+ type=int,
34
+ default=settings.time_tolerance_minutes,
35
+ help="Time tolerance in minutes for comparing expenses (default: %(default)s).",
36
+ )
37
+ parser.add_argument(
38
+ "--amount-pct",
39
+ dest="amount_pct",
40
+ type=float,
41
+ default=float(settings.amount_tolerance_pct),
42
+ help="Amount tolerance percentage (default: %(default)s).",
43
+ )
44
+ parser.add_argument(
45
+ "--lookback-hours",
46
+ dest="lookback_hours",
47
+ type=int,
48
+ default=settings.default_lookback_hours,
49
+ help="How far back to fetch expenses (default: %(default)s).",
50
+ )
51
+ parser.add_argument(
52
+ "--limit",
53
+ dest="limit",
54
+ type=int,
55
+ default=settings.max_batch_size,
56
+ help="Maximum number of expenses to scan (default: %(default)s).",
57
+ )
58
+ parser.add_argument(
59
+ "--verbose",
60
+ action="store_true",
61
+ help="Enable debug logging.",
62
+ )
63
+ return parser.parse_args(argv)
64
+
65
+
66
+ def main(argv: list[str] | None = None) -> int:
67
+ args = parse_args(argv)
68
+ configure_logging(args.verbose)
69
+
70
+ client = build_client()
71
+ alias_repo = MerchantAliasRepository.from_client(client)
72
+ alias_resolver = MerchantAliasResolver()
73
+ alias_resolver.load_from_cursor(alias_repo.fetch_all())
74
+
75
+ expense_repo = ExpenseRepository.from_client(client)
76
+ expenses = expense_repo.fetch_recent(args.lookback_hours, args.limit)
77
+
78
+ if not expenses:
79
+ logging.info("No expenses found for lookback window")
80
+ return 0
81
+
82
+ suggestion_repo = MergeSuggestionRepository.from_client(client)
83
+ detector = DuplicateDetector(
84
+ alias_resolver=alias_resolver,
85
+ suggestions_repo=suggestion_repo,
86
+ amount_tolerance_pct=args.amount_pct,
87
+ time_tolerance_minutes=args.minutes,
88
+ )
89
+
90
+ clusters = detector.find_clusters(expenses)
91
+ if not clusters:
92
+ logging.info("No duplicate clusters detected")
93
+ return 0
94
+
95
+ suggestion_ids = detector.persist_suggestions(clusters)
96
+ logging.info(
97
+ "Finished writing %d suggestions. Example message: %s",
98
+ len(suggestion_ids),
99
+ "These seem similar. Would you like to merge them?",
100
+ )
101
+ return 0
102
+
103
+
104
+ if __name__ == "__main__":
105
+ sys.exit(main())
106
+
src/merchant_alias.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Utilities to resolve merchants that are aliases of each other."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import re
6
+ from dataclasses import dataclass, field
7
+ from typing import Dict, Iterable, Set
8
+
9
+
10
+ MERCHANT_CLEAN_RE = re.compile(r"[^a-z0-9]+")
11
+
12
+
13
+ def normalize_merchant(name: str) -> str:
14
+ """Lowercase, strip and remove punctuation for comparisons."""
15
+ cleaned = MERCHANT_CLEAN_RE.sub("", name.strip().lower())
16
+ return cleaned
17
+
18
+
19
+ @dataclass
20
+ class MerchantAliasResolver:
21
+ """A simple in-memory alias graph.
22
+
23
+ The resolver can hydrate itself from Mongo (through a repository) or
24
+ fall back to a small bootstrapped dictionary.
25
+ """
26
+
27
+ alias_sets: Dict[str, Set[str]] = field(default_factory=dict)
28
+
29
+ def load_from_cursor(self, alias_documents: Iterable[dict]) -> None:
30
+ for doc in alias_documents:
31
+ canonical = normalize_merchant(doc.get("name", ""))
32
+ aliases = {normalize_merchant(alias) for alias in doc.get("aliases", [])}
33
+ aliases.add(canonical)
34
+ self.alias_sets[canonical] = aliases
35
+
36
+ def are_aliases(self, a: str, b: str) -> tuple[bool, str]:
37
+ norm_a = normalize_merchant(a)
38
+ norm_b = normalize_merchant(b)
39
+ if not norm_a or not norm_b:
40
+ return False, "blank"
41
+ if norm_a == norm_b:
42
+ return True, "exact"
43
+ for root, alias_group in self.alias_sets.items():
44
+ if norm_a in alias_group and norm_b in alias_group:
45
+ return True, "alias"
46
+ return False, "none"
47
+
src/models.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Domain models for expenses and merge suggestions."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from dataclasses import dataclass, field
6
+ from datetime import datetime
7
+ from decimal import Decimal
8
+ from typing import List, Mapping, Sequence
9
+
10
+
11
+ @dataclass(frozen=True)
12
+ class Expense:
13
+ expense_id: str
14
+ amount: Decimal
15
+ currency: str
16
+ merchant: str
17
+ expense_time: datetime
18
+ source: str | None = None
19
+ metadata: Mapping[str, object] | None = None
20
+
21
+ @staticmethod
22
+ def from_document(doc: Mapping[str, object]) -> "Expense":
23
+ try:
24
+ amount_value = Decimal(str(doc["amount"]))
25
+ except KeyError as exc:
26
+ raise ValueError("Expense document missing 'amount'") from exc
27
+ return Expense(
28
+ expense_id=str(doc.get("_id")),
29
+ amount=amount_value,
30
+ currency=str(doc.get("currency", "INR")),
31
+ merchant=str(doc.get("merchant", "")).strip(),
32
+ expense_time=doc["expense_time"],
33
+ source=doc.get("source"),
34
+ metadata=doc.get("metadata") or {},
35
+ )
36
+
37
+
38
+ @dataclass
39
+ class MergeSuggestion:
40
+ candidate_ids: Sequence[str]
41
+ message: str
42
+ details: Mapping[str, object]
43
+ audit: Mapping[str, object]
44
+ status: str = "pending"
45
+ _id: str | None = None
46
+
47
+ def to_document(self) -> Mapping[str, object]:
48
+ return {
49
+ "candidate_ids": list(self.candidate_ids),
50
+ "message": self.message,
51
+ "details": dict(self.details),
52
+ "audit": dict(self.audit),
53
+ "status": self.status,
54
+ }
55
+
56
+
57
+ @dataclass
58
+ class DuplicateCluster:
59
+ expenses: List[Expense] = field(default_factory=list)
60
+ amount_delta_pct: float = 0.0
61
+ time_delta_minutes: float = 0.0
62
+ merchant_rule: str = "exact"
63
+
64
+ def to_details(self) -> Mapping[str, object]:
65
+ return {
66
+ "amount_delta_pct": self.amount_delta_pct,
67
+ "time_delta_minutes": self.time_delta_minutes,
68
+ "merchant_match_rule": self.merchant_rule,
69
+ }
70
+
src/repositories.py ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Mongo repositories used by the duplicate detector."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from datetime import datetime, timedelta, timezone
6
+ from typing import Iterable, List, Mapping
7
+
8
+ from pymongo import MongoClient
9
+ from pymongo.collection import Collection
10
+ from pymongo.errors import DuplicateKeyError
11
+
12
+ from . import config
13
+ from .models import Expense, MergeSuggestion
14
+
15
+
16
+ def build_client() -> MongoClient:
17
+ return MongoClient(config.settings.mongo_uri)
18
+
19
+
20
+ class ExpenseRepository:
21
+ def __init__(self, collection: Collection):
22
+ self._collection = collection
23
+
24
+ @classmethod
25
+ def from_client(cls, client: MongoClient) -> "ExpenseRepository":
26
+ return cls(
27
+ client[config.settings.mongo_db][config.settings.expense_collection],
28
+ )
29
+
30
+ def fetch_recent(self, lookback_hours: int, limit: int) -> List[Expense]:
31
+ since = datetime.now(tz=timezone.utc) - timedelta(hours=lookback_hours)
32
+ cursor = (
33
+ self._collection.find(
34
+ {"expense_time": {"$gte": since}},
35
+ sort=[("expense_time", 1)],
36
+ limit=limit,
37
+ )
38
+ or []
39
+ )
40
+ return [Expense.from_document(doc) for doc in cursor]
41
+
42
+
43
+ class MerchantAliasRepository:
44
+ def __init__(self, collection: Collection):
45
+ self._collection = collection
46
+
47
+ @classmethod
48
+ def from_client(cls, client: MongoClient) -> "MerchantAliasRepository":
49
+ return cls(
50
+ client[config.settings.mongo_db][config.settings.alias_collection],
51
+ )
52
+
53
+ def fetch_all(self) -> Iterable[dict]:
54
+ return self._collection.find({}, projection={"name": 1, "aliases": 1})
55
+
56
+
57
+ class MergeSuggestionRepository:
58
+ def __init__(self, collection: Collection):
59
+ self._collection = collection
60
+
61
+ @classmethod
62
+ def from_client(cls, client: MongoClient) -> "MergeSuggestionRepository":
63
+ return cls(
64
+ client[config.settings.mongo_db][config.settings.suggestion_collection],
65
+ )
66
+
67
+ def insert_soft_merge(self, suggestion: MergeSuggestion) -> str:
68
+ candidate_ids = sorted(suggestion.candidate_ids)
69
+ existing = self._collection.find_one({"candidate_ids": candidate_ids}, {"_id": 1})
70
+ if existing:
71
+ return str(existing["_id"])
72
+ doc = suggestion.to_document()
73
+ doc["candidate_ids"] = candidate_ids
74
+ result = self._collection.insert_one(doc)
75
+ return str(result.inserted_id)
76
+
77
+ def fetch_recent(self, limit: int = 50) -> List[Mapping[str, object]]:
78
+ cursor = (
79
+ self._collection.find({}, sort=[("audit.generated_at", -1)], limit=limit)
80
+ or []
81
+ )
82
+ suggestions: List[Mapping[str, object]] = []
83
+ for doc in cursor:
84
+ doc["_id"] = str(doc.get("_id"))
85
+ suggestions.append(doc)
86
+ return suggestions
87
+