| """Scheduler for periodic duplicate detection.""" | |
| from __future__ import annotations | |
| import logging | |
| from datetime import datetime | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| from apscheduler.triggers.interval import IntervalTrigger | |
| from .config import settings | |
| from .duplicate_detector import DuplicateDetector | |
| from .merchant_alias import MerchantAliasResolver | |
| from .repositories import ( | |
| ExpenseRepository, | |
| MerchantAliasRepository, | |
| MergeSuggestionRepository, | |
| ) | |
| logger = logging.getLogger("DuplicateScheduler") | |
| class DuplicateScheduler: | |
| def __init__(self): | |
| self.scheduler = AsyncIOScheduler() | |
| def start(self, db): | |
| """Start a periodic job for duplicate detection.""" | |
| self.scheduler.add_job( | |
| run_duplicate_cron, | |
| IntervalTrigger(seconds=settings.scheduler_interval_seconds), | |
| args=[db], | |
| max_instances=1, | |
| coalesce=True, | |
| id="duplicate-cron", | |
| replace_existing=True, | |
| ) | |
| self.scheduler.start() | |
| def shutdown(self): | |
| if self.scheduler.running: | |
| self.scheduler.shutdown(wait=False) | |
| async def run_duplicate_cron(db): | |
| now = datetime.utcnow() | |
| logger.info("Duplicate detection cron tick at %sZ", now.isoformat()) | |
| expense_repository = ExpenseRepository.from_client(db) | |
| alias_repository = MerchantAliasRepository.from_client(db) | |
| suggestion_repository = MergeSuggestionRepository.from_client(db) | |
| lookback_hours = settings.default_lookback_hours | |
| limit = settings.max_batch_size | |
| amount_pct = float(settings.amount_tolerance_pct) | |
| minutes = settings.time_tolerance_minutes | |
| alias_resolver = MerchantAliasResolver() | |
| alias_resolver.load_from_cursor(alias_repository.fetch_all()) | |
| detector = DuplicateDetector( | |
| alias_resolver=alias_resolver, | |
| suggestions_repo=suggestion_repository, | |
| amount_tolerance_pct=amount_pct, | |
| time_tolerance_minutes=minutes, | |
| ) | |
| expenses = expense_repository.fetch_recent(lookback_hours, limit) | |
| if not expenses: | |
| logger.info("No expenses found for the requested window.") | |
| return | |
| clusters = detector.find_clusters(expenses) | |
| if clusters: | |
| suggestion_ids = detector.persist_suggestions(clusters) | |
| logger.info( | |
| "Scanned %d expenses, found %d clusters, created %d suggestions", | |
| len(expenses), | |
| len(clusters), | |
| len(suggestion_ids), | |
| ) | |
| else: | |
| logger.info("Scanned %d expenses, no duplicate clusters detected", len(expenses)) | |