Computations / scheduler_loop.py
shaliz-kong
Initial commit: self-hosted Redis analytics engine
98cacb0
import json, time, os, requests
import logging
from datetime import datetime, timedelta
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | scheduler | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
SCHEDULE_FILE = "/data/.schedules.json"
RUN_URL = "http://localhost:8000/analytics/run" # inside container
def tick():
if not Path(SCHEDULE_FILE).exists():
return
with open(SCHEDULE_FILE) as f:
schedules = json.load(f)
now = datetime.utcnow().isoformat()
for s in schedules:
if s["nextRun"] <= now:
for analytic in s["analytics"]:
# call the same endpoint the UI uses
r = requests.post(RUN_URL,
json={"analytic": analytic},
headers={"X-Data-Path": f"/data/{s['orgId']}/sales.parquet"})
logger.info(f"ran {analytic} for {s['orgId']} -> status={r.status_code}")
# bump nextRun
s["nextRun"] = (_next_run(s["frequency"])).isoformat()
with open(SCHEDULE_FILE, "w") as f:
json.dump(schedules, f, indent=2)
def _next_run(frequency: str) -> datetime:
now = datetime.utcnow()
if frequency == "daily": return now + timedelta(days=1)
if frequency == "weekly": return now + timedelta(weeks=1)
if frequency == "monthly": return now + timedelta(days=30)
return now
if __name__ == "__main__":
logger.info("๐Ÿ”„ Scheduler loop started (60s interval)")
while True:
try:
tick()
except Exception as e:
logger.error(f"error: {e}")
time.sleep(60) # 1-minute granularity