Mutsynchub / app /tasks /scheduler.py
shaliz-kong
Initial commit: self-hosted Redis, DuckDB, Analytics Engine
98a466d
"""
State-less scheduler – caller (Next-js) orchestrates storage & quota.
Only duty: run analytics on cron, return JSON.
"""
import asyncio
import pandas as pd
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.engine.analytics import AnalyticsService
from app.service.industry_svc import (eda, forecast, basket, market_dynamics,
supply_chain, customer_insights,
operational_efficiency, risk_assessment,
sustainability)
from app.utils.detect_industry import detect_industry
from app.utils.email import send_pdf_email
import os
from datetime import datetime
import aiohttp
sched = AsyncIOScheduler()
# ------------------------------------------------------------------
# 1 RUN ONE ANALYTIC – pure logic, no DB
# ------------------------------------------------------------------
async def run_analytic_job(org_id: str, analytic_type: str, **kwargs) -> dict:
"""
1. Canonify last 6 h of raw rows (any column names) via DuckDB
2. Compute chosen analytic
3. Log KPIs + purge old raw data
4. Return shaped payload
"""
from app.mapper import canonify_df # NEW: any-shape β†’ canonical
from app.tasks.kpi_logger import log_kpis_and_purge # NEW: history & tidy
df = canonify_df(org_id)
if df.empty:
return {"error": "No recent data found"}
data = df.to_dict("records")
industry, _ = detect_industry(df)
match analytic_type:
case "eda":
result = await eda(data, industry)
case "forecast":
result = await forecast(data, kwargs["date_col"], kwargs["value_col"])
case "basket":
result = await basket(data, 0.01, 0.3, 1.0)
case "market-dynamics":
result = await market_dynamics(data)
case "supply-chain":
result = await supply_chain(data)
case "customer-insights":
result = await customer_insights(data)
case "operational-efficiency":
result = await operational_efficiency(data)
case "risk-assessment":
result = await risk_assessment(data)
case "sustainability":
result = await sustainability(data)
case _:
return {"error": "Unknown analytic"}
# ---------- NEW – history + disk tidy ----------
log_kpis_and_purge(org_id) # inserts KPIs & deletes raw > 6 h
# -------------------------------------------------
async with aiohttp.ClientSession() as session:
await session.post(
f"{os.getenv('NEXT_PUBLIC_ORIGIN')}/analytics/report/sync",
json={
"orgId": org_id,
"type": analytic_type,
"results": result,
"lastRun": datetime.utcnow().isoformat(),
},
headers={"x-api-key": os.getenv("ANALYTICS_KEY")},
)
# fire-and-forget email (caller decides storage)
pdf_url = f"{os.getenv('PUBLIC_URL', '')}/api/reports/{org_id}/{analytic_type}.pdf"
asyncio.create_task(send_pdf_email(org_id, f"{analytic_type} report", {"pdf": pdf_url, "data": result}))
return {"orgId": org_id, "analytic": analytic_type, "industry": industry, "results": result, "timestamp": datetime.utcnow().isoformat()}
# ------------------------------------------------------------------
# 2 APScheduler glue – unchanged
# ------------------------------------------------------------------
def add_job_to_scheduler(schedule: dict):
org_id = schedule["orgId"]
freq = schedule["frequency"]
analytics = schedule["analytics"]
for analytic in analytics:
job_id = f"{schedule['id']}_{analytic}"
if freq == "daily":
sched.add_job(run_analytic_job, "cron", hour=6, minute=0,
args=[org_id, analytic], id=job_id)
elif freq == "weekly":
sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0,
args=[org_id, analytic], id=job_id)
elif freq == "monthly":
sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0,
args=[org_id, analytic], id=job_id)
def remove_job_from_scheduler(schedule_id: str):
for job in sched.get_jobs():
if job.id.startswith(schedule_id):
sched.remove_job(job.id)
# ------------------------------------------------------------------
# 3 ENV-loader – unchanged
# ------------------------------------------------------------------
async def load_schedules():
import json
raw = os.getenv("SCHEDULES", "[]")
try:
schedules = json.loads(raw)
except Exception:
schedules = []
for sch in schedules:
org_id = sch["orgId"]
freq = sch.get("frequency", "daily")
analytics = sch.get("analytics", ["eda"])
for analytic in analytics:
job_id = f"{org_id}_{analytic}"
if freq == "daily":
sched.add_job(run_analytic_job, "cron", hour=6, minute=0, args=[org_id, analytic], id=job_id)
elif freq == "weekly":
sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, args=[org_id, analytic], id=job_id)
elif freq == "monthly":
sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, args=[org_id, analytic], id=job_id)
# ------------------------------------------------------------------
# 4 STARTER
# ------------------------------------------------------------------
def start_scheduler():
asyncio.create_task(load_schedules())
sched.start()