File size: 1,761 Bytes
98a466d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import json, time, os, requests
import logging
from datetime import datetime, timedelta  
from pathlib import Path

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | scheduler | %(levelname)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)

SCHEDULE_FILE = "/data/.schedules.json"
RUN_URL = "http://localhost:8000/analytics/run"   # inside container

def tick():
    if not Path(SCHEDULE_FILE).exists():
        return
    with open(SCHEDULE_FILE) as f:
        schedules = json.load(f)

    now = datetime.utcnow().isoformat()
    for s in schedules:
        if s["nextRun"] <= now:
            for analytic in s["analytics"]:
                # call the same endpoint the UI uses
                r = requests.post(RUN_URL,
                                  json={"analytic": analytic},
                                  headers={"X-Data-Path": f"/data/{s['orgId']}/sales.parquet"})
                logger.info(f"ran {analytic} for {s['orgId']} -> status={r.status_code}")
            # bump nextRun
            s["nextRun"] = (_next_run(s["frequency"])).isoformat()

    with open(SCHEDULE_FILE, "w") as f:
        json.dump(schedules, f, indent=2)

def _next_run(frequency: str) -> datetime:
    now = datetime.utcnow()
    if frequency == "daily":   return now + timedelta(days=1)
    if frequency == "weekly":  return now + timedelta(weeks=1)
    if frequency == "monthly": return now + timedelta(days=30)
    return now

if __name__ == "__main__":
    logger.info("🔄 Scheduler loop started (60s interval)")
    while True:
        try:
            tick()
        except Exception as e:
            logger.error(f"error: {e}")
        time.sleep(60)   # 1-minute granularity