HariLogicgo's picture
deployment ready
b8fc47f
import math
from datetime import datetime
from typing import Iterable
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from app.services.overspend import check_overspending
PERIOD_BUCKETS = {"YEARLY", "MONTHLY", "WEEKLY"}
class BudgetScheduler:
def __init__(self):
self.scheduler = AsyncIOScheduler()
def start(self, db):
"""Start a 10s interval job for budget monitoring."""
self.scheduler.add_job(
run_budget_cron,
IntervalTrigger(seconds=10),
args=[db],
max_instances=1,
coalesce=True,
id="budget-cron",
replace_existing=True,
)
self.scheduler.start()
def shutdown(self):
if self.scheduler.running:
self.scheduler.shutdown(wait=False)
async def run_budget_cron(db):
now = datetime.utcnow()
print(f"Budget cron tick at {now.isoformat()}Z")
budgets_cursor = db["budgets"].find({"status": "OPEN"})
budgets: Iterable[dict] = await budgets_cursor.to_list(length=None)
current_week = math.ceil(now.day / 7)
for budget in budgets:
current_spend = budget.get("spendAmount") or 0
alerts = await check_overspending(budget)
if not alerts:
continue
budget_id = str(budget.get("_id"))
last_log = await db["alert_logs"].find_one(
{"budgetId": budget_id}, sort=[("triggeredAt", -1)]
)
last_alert_date = last_log.get("triggeredAt") if last_log else None
budget_updated_at = budget.get("updatedAt")
should_send = False
if not last_alert_date:
should_send = True
else:
if budget.get("period") == "YEARLY" and last_alert_date.year != now.year:
should_send = True
if budget.get("period") == "MONTHLY" and (
last_alert_date.year != now.year or last_alert_date.month != now.month
):
should_send = True
if budget.get("period") == "WEEKLY":
diff_days = (now - last_alert_date).days
if diff_days >= 7:
should_send = True
period = budget.get("period")
uses_period_buckets = period in PERIOD_BUCKETS
month_bucket = (now.month - 1) if period == "MONTHLY" else None
week_bucket = current_week if period == "WEEKLY" else None
year_bucket = now.year if uses_period_buckets else None
existing_log = await db["alert_logs"].find_one(
{
"budgetId": budget_id,
"period": period,
"year": year_bucket,
"month": month_bucket,
"week": week_bucket,
}
)
if existing_log:
last_trigger_at = existing_log.get("triggeredAt")
updated_after_last_alert = (
budget_updated_at and last_trigger_at and budget_updated_at > last_trigger_at
)
last_spend_snapshot = existing_log.get("budgetSpendSnapshot")
if last_spend_snapshot is None:
last_spend_snapshot = float("-inf")
spend_spike_detected = current_spend > last_spend_snapshot
if updated_after_last_alert and spend_spike_detected:
await db["alert_logs"].update_one(
{"_id": existing_log["_id"]},
{
"$set": {
"alerts": alerts,
"triggeredAt": now,
"budgetName": budget.get("name"),
"budgetSpendSnapshot": current_spend,
}
},
)
print(f"Overspend alert updated for: {budget.get('name')}")
should_send = False
if not should_send:
continue
await db["alert_logs"].insert_one(
{
"budgetId": budget_id,
"budgetName": budget.get("name"),
"alerts": alerts,
"budgetSpendSnapshot": current_spend,
"period": period,
"year": year_bucket,
"month": month_bucket,
"week": week_bucket,
"triggeredAt": now,
}
)
print(f"Overspend alert logged for: {budget.get('name')}")