DUPLICATE_TRANSACTION_DETECTION / src /duplicate_scheduler.py
LogicGoInfotechSpaces's picture
Convert API to scheduler-based pattern for duplicate detection
d702071
"""Scheduler for periodic duplicate detection."""
from __future__ import annotations
import logging
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from .config import settings
from .duplicate_detector import DuplicateDetector
from .merchant_alias import MerchantAliasResolver
from .repositories import (
ExpenseRepository,
MerchantAliasRepository,
MergeSuggestionRepository,
)
logger = logging.getLogger("DuplicateScheduler")
class DuplicateScheduler:
def __init__(self):
self.scheduler = AsyncIOScheduler()
def start(self, db):
"""Start a periodic job for duplicate detection."""
self.scheduler.add_job(
run_duplicate_cron,
IntervalTrigger(seconds=settings.scheduler_interval_seconds),
args=[db],
max_instances=1,
coalesce=True,
id="duplicate-cron",
replace_existing=True,
)
self.scheduler.start()
def shutdown(self):
if self.scheduler.running:
self.scheduler.shutdown(wait=False)
async def run_duplicate_cron(db):
now = datetime.utcnow()
logger.info("Duplicate detection cron tick at %sZ", now.isoformat())
expense_repository = ExpenseRepository.from_client(db)
alias_repository = MerchantAliasRepository.from_client(db)
suggestion_repository = MergeSuggestionRepository.from_client(db)
lookback_hours = settings.default_lookback_hours
limit = settings.max_batch_size
amount_pct = float(settings.amount_tolerance_pct)
minutes = settings.time_tolerance_minutes
alias_resolver = MerchantAliasResolver()
alias_resolver.load_from_cursor(alias_repository.fetch_all())
detector = DuplicateDetector(
alias_resolver=alias_resolver,
suggestions_repo=suggestion_repository,
amount_tolerance_pct=amount_pct,
time_tolerance_minutes=minutes,
)
expenses = expense_repository.fetch_recent(lookback_hours, limit)
if not expenses:
logger.info("No expenses found for the requested window.")
return
clusters = detector.find_clusters(expenses)
if clusters:
suggestion_ids = detector.persist_suggestions(clusters)
logger.info(
"Scanned %d expenses, found %d clusters, created %d suggestions",
len(expenses),
len(clusters),
len(suggestion_ids),
)
else:
logger.info("Scanned %d expenses, no duplicate clusters detected", len(expenses))