| import math | |
| from datetime import datetime | |
| from typing import Iterable | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| from apscheduler.triggers.interval import IntervalTrigger | |
| from app.services.overspend import check_overspending | |
| PERIOD_BUCKETS = {"YEARLY", "MONTHLY", "WEEKLY"} | |
| class BudgetScheduler: | |
| def __init__(self): | |
| self.scheduler = AsyncIOScheduler() | |
| def start(self, db): | |
| """Start a 10s interval job for budget monitoring.""" | |
| self.scheduler.add_job( | |
| run_budget_cron, | |
| IntervalTrigger(seconds=10), | |
| args=[db], | |
| max_instances=1, | |
| coalesce=True, | |
| id="budget-cron", | |
| replace_existing=True, | |
| ) | |
| self.scheduler.start() | |
| def shutdown(self): | |
| if self.scheduler.running: | |
| self.scheduler.shutdown(wait=False) | |
| async def run_budget_cron(db): | |
| now = datetime.utcnow() | |
| print(f"Budget cron tick at {now.isoformat()}Z") | |
| budgets_cursor = db["budgets"].find({"status": "OPEN"}) | |
| budgets: Iterable[dict] = await budgets_cursor.to_list(length=None) | |
| current_week = math.ceil(now.day / 7) | |
| for budget in budgets: | |
| current_spend = budget.get("spendAmount") or 0 | |
| alerts = await check_overspending(budget) | |
| if not alerts: | |
| continue | |
| budget_id = str(budget.get("_id")) | |
| last_log = await db["alert_logs"].find_one( | |
| {"budgetId": budget_id}, sort=[("triggeredAt", -1)] | |
| ) | |
| last_alert_date = last_log.get("triggeredAt") if last_log else None | |
| budget_updated_at = budget.get("updatedAt") | |
| should_send = False | |
| if not last_alert_date: | |
| should_send = True | |
| else: | |
| if budget.get("period") == "YEARLY" and last_alert_date.year != now.year: | |
| should_send = True | |
| if budget.get("period") == "MONTHLY" and ( | |
| last_alert_date.year != now.year or last_alert_date.month != now.month | |
| ): | |
| should_send = True | |
| if budget.get("period") == "WEEKLY": | |
| diff_days = (now - last_alert_date).days | |
| if diff_days >= 7: | |
| should_send = True | |
| period = budget.get("period") | |
| uses_period_buckets = period in PERIOD_BUCKETS | |
| month_bucket = (now.month - 1) if period == "MONTHLY" else None | |
| week_bucket = current_week if period == "WEEKLY" else None | |
| year_bucket = now.year if uses_period_buckets else None | |
| existing_log = await db["alert_logs"].find_one( | |
| { | |
| "budgetId": budget_id, | |
| "period": period, | |
| "year": year_bucket, | |
| "month": month_bucket, | |
| "week": week_bucket, | |
| } | |
| ) | |
| if existing_log: | |
| last_trigger_at = existing_log.get("triggeredAt") | |
| updated_after_last_alert = ( | |
| budget_updated_at and last_trigger_at and budget_updated_at > last_trigger_at | |
| ) | |
| last_spend_snapshot = existing_log.get("budgetSpendSnapshot") | |
| if last_spend_snapshot is None: | |
| last_spend_snapshot = float("-inf") | |
| spend_spike_detected = current_spend > last_spend_snapshot | |
| if updated_after_last_alert and spend_spike_detected: | |
| await db["alert_logs"].update_one( | |
| {"_id": existing_log["_id"]}, | |
| { | |
| "$set": { | |
| "alerts": alerts, | |
| "triggeredAt": now, | |
| "budgetName": budget.get("name"), | |
| "budgetSpendSnapshot": current_spend, | |
| } | |
| }, | |
| ) | |
| print(f"Overspend alert updated for: {budget.get('name')}") | |
| should_send = False | |
| if not should_send: | |
| continue | |
| await db["alert_logs"].insert_one( | |
| { | |
| "budgetId": budget_id, | |
| "budgetName": budget.get("name"), | |
| "alerts": alerts, | |
| "budgetSpendSnapshot": current_spend, | |
| "period": period, | |
| "year": year_bucket, | |
| "month": month_bucket, | |
| "week": week_bucket, | |
| "triggeredAt": now, | |
| } | |
| ) | |
| print(f"Overspend alert logged for: {budget.get('name')}") | |