File size: 5,689 Bytes
472833f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | """
State-less scheduler β caller (Next-js) orchestrates storage & quota.
Only duty: run analytics on cron, return JSON.
"""
import asyncio
import pandas as pd
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.engine.analytics import AnalyticsService
from app.service.industry_svc import (eda, forecast, basket, market_dynamics,
supply_chain, customer_insights,
operational_efficiency, risk_assessment,
sustainability)
from app.utils.detect_industry import detect_industry
from app.utils.email import send_pdf_email
import os
from datetime import datetime
import aiohttp
sched = AsyncIOScheduler()
# ------------------------------------------------------------------
# 1 RUN ONE ANALYTIC β pure logic, no DB
# ------------------------------------------------------------------
async def run_analytic_job(org_id: str, analytic_type: str, **kwargs) -> dict:
"""
1. Canonify last 6 h of raw rows (any column names) via DuckDB
2. Compute chosen analytic
3. Log KPIs + purge old raw data
4. Return shaped payload
"""
from app.mapper import canonify_df # NEW: any-shape β canonical
from app.tasks.kpi_logger import log_kpis_and_purge # NEW: history & tidy
df = canonify_df(org_id)
if df.empty:
return {"error": "No recent data found"}
data = df.to_dict("records")
industry, _ = detect_industry(df)
match analytic_type:
case "eda":
result = await eda(data, industry)
case "forecast":
result = await forecast(data, kwargs["date_col"], kwargs["value_col"])
case "basket":
result = await basket(data, 0.01, 0.3, 1.0)
case "market-dynamics":
result = await market_dynamics(data)
case "supply-chain":
result = await supply_chain(data)
case "customer-insights":
result = await customer_insights(data)
case "operational-efficiency":
result = await operational_efficiency(data)
case "risk-assessment":
result = await risk_assessment(data)
case "sustainability":
result = await sustainability(data)
case _:
return {"error": "Unknown analytic"}
# ---------- NEW β history + disk tidy ----------
log_kpis_and_purge(org_id) # inserts KPIs & deletes raw > 6 h
# -------------------------------------------------
async with aiohttp.ClientSession() as session:
await session.post(
f"{os.getenv('NEXT_PUBLIC_ORIGIN')}/analytics/report/sync",
json={
"orgId": org_id,
"type": analytic_type,
"results": result,
"lastRun": datetime.utcnow().isoformat(),
},
headers={"x-api-key": os.getenv("ANALYTICS_KEY")},
)
# fire-and-forget email (caller decides storage)
pdf_url = f"{os.getenv('PUBLIC_URL', '')}/api/reports/{org_id}/{analytic_type}.pdf"
asyncio.create_task(send_pdf_email(org_id, f"{analytic_type} report", {"pdf": pdf_url, "data": result}))
return {"orgId": org_id, "analytic": analytic_type, "industry": industry, "results": result, "timestamp": datetime.utcnow().isoformat()}
# ------------------------------------------------------------------
# 2 APScheduler glue β unchanged
# ------------------------------------------------------------------
def add_job_to_scheduler(schedule: dict):
org_id = schedule["orgId"]
freq = schedule["frequency"]
analytics = schedule["analytics"]
for analytic in analytics:
job_id = f"{schedule['id']}_{analytic}"
if freq == "daily":
sched.add_job(run_analytic_job, "cron", hour=6, minute=0,
args=[org_id, analytic], id=job_id)
elif freq == "weekly":
sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0,
args=[org_id, analytic], id=job_id)
elif freq == "monthly":
sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0,
args=[org_id, analytic], id=job_id)
def remove_job_from_scheduler(schedule_id: str):
for job in sched.get_jobs():
if job.id.startswith(schedule_id):
sched.remove_job(job.id)
# ------------------------------------------------------------------
# 3 ENV-loader β unchanged
# ------------------------------------------------------------------
async def load_schedules():
import json
raw = os.getenv("SCHEDULES", "[]")
try:
schedules = json.loads(raw)
except Exception:
schedules = []
for sch in schedules:
org_id = sch["orgId"]
freq = sch.get("frequency", "daily")
analytics = sch.get("analytics", ["eda"])
for analytic in analytics:
job_id = f"{org_id}_{analytic}"
if freq == "daily":
sched.add_job(run_analytic_job, "cron", hour=6, minute=0, args=[org_id, analytic], id=job_id)
elif freq == "weekly":
sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, args=[org_id, analytic], id=job_id)
elif freq == "monthly":
sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, args=[org_id, analytic], id=job_id)
# ------------------------------------------------------------------
# 4 STARTER
# ------------------------------------------------------------------
def start_scheduler():
asyncio.create_task(load_schedules())
sched.start() |