| | import json, time, os, requests |
| | import logging |
| | from datetime import datetime, timedelta |
| | from pathlib import Path |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format="%(asctime)s | scheduler | %(levelname)s | %(message)s", |
| | datefmt="%Y-%m-%d %H:%M:%S" |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | SCHEDULE_FILE = "/data/.schedules.json" |
| | RUN_URL = "http://localhost:8000/analytics/run" |
| |
|
| | def tick(): |
| | if not Path(SCHEDULE_FILE).exists(): |
| | return |
| | with open(SCHEDULE_FILE) as f: |
| | schedules = json.load(f) |
| |
|
| | now = datetime.utcnow().isoformat() |
| | for s in schedules: |
| | if s["nextRun"] <= now: |
| | for analytic in s["analytics"]: |
| | |
| | r = requests.post(RUN_URL, |
| | json={"analytic": analytic}, |
| | headers={"X-Data-Path": f"/data/{s['orgId']}/sales.parquet"}) |
| | logger.info(f"ran {analytic} for {s['orgId']} -> status={r.status_code}") |
| | |
| | s["nextRun"] = (_next_run(s["frequency"])).isoformat() |
| |
|
| | with open(SCHEDULE_FILE, "w") as f: |
| | json.dump(schedules, f, indent=2) |
| |
|
| | def _next_run(frequency: str) -> datetime: |
| | now = datetime.utcnow() |
| | if frequency == "daily": return now + timedelta(days=1) |
| | if frequency == "weekly": return now + timedelta(weeks=1) |
| | if frequency == "monthly": return now + timedelta(days=30) |
| | return now |
| |
|
| | if __name__ == "__main__": |
| | logger.info("๐ Scheduler loop started (60s interval)") |
| | while True: |
| | try: |
| | tick() |
| | except Exception as e: |
| | logger.error(f"error: {e}") |
| | time.sleep(60) |