"""Scheduler for periodic duplicate detection.""" from __future__ import annotations import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from .config import settings from .duplicate_detector import DuplicateDetector from .merchant_alias import MerchantAliasResolver from .repositories import ( ExpenseRepository, MerchantAliasRepository, MergeSuggestionRepository, ) logger = logging.getLogger("DuplicateScheduler") class DuplicateScheduler: def __init__(self): self.scheduler = AsyncIOScheduler() def start(self, db): """Start a periodic job for duplicate detection.""" self.scheduler.add_job( run_duplicate_cron, IntervalTrigger(seconds=settings.scheduler_interval_seconds), args=[db], max_instances=1, coalesce=True, id="duplicate-cron", replace_existing=True, ) self.scheduler.start() def shutdown(self): if self.scheduler.running: self.scheduler.shutdown(wait=False) async def run_duplicate_cron(db): now = datetime.utcnow() logger.info("Duplicate detection cron tick at %sZ", now.isoformat()) expense_repository = ExpenseRepository.from_client(db) alias_repository = MerchantAliasRepository.from_client(db) suggestion_repository = MergeSuggestionRepository.from_client(db) lookback_hours = settings.default_lookback_hours limit = settings.max_batch_size amount_pct = float(settings.amount_tolerance_pct) minutes = settings.time_tolerance_minutes alias_resolver = MerchantAliasResolver() alias_resolver.load_from_cursor(alias_repository.fetch_all()) detector = DuplicateDetector( alias_resolver=alias_resolver, suggestions_repo=suggestion_repository, amount_tolerance_pct=amount_pct, time_tolerance_minutes=minutes, ) expenses = expense_repository.fetch_recent(lookback_hours, limit) if not expenses: logger.info("No expenses found for the requested window.") return clusters = detector.find_clusters(expenses) if clusters: suggestion_ids = detector.persist_suggestions(clusters) logger.info( "Scanned %d expenses, found %d clusters, created %d suggestions", len(expenses), len(clusters), len(suggestion_ids), ) else: logger.info("Scanned %d expenses, no duplicate clusters detected", len(expenses))