| | """ |
| | State-less scheduler β caller (Next-js) orchestrates storage & quota. |
| | Only duty: run analytics on cron, return JSON. |
| | """ |
| | import asyncio |
| | import pandas as pd |
| | from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| | from app.engine.analytics import AnalyticsService |
| | from app.service.industry_svc import (eda, forecast, basket, market_dynamics, |
| | supply_chain, customer_insights, |
| | operational_efficiency, risk_assessment, |
| | sustainability) |
| | from app.utils.detect_industry import detect_industry |
| | from app.utils.email import send_pdf_email |
| | import os |
| | from datetime import datetime |
| | import aiohttp |
| |
|
| | sched = AsyncIOScheduler() |
| |
|
| | |
| | |
| | |
| | async def run_analytic_job(org_id: str, analytic_type: str, **kwargs) -> dict: |
| | """ |
| | 1. Canonify last 6 h of raw rows (any column names) via DuckDB |
| | 2. Compute chosen analytic |
| | 3. Log KPIs + purge old raw data |
| | 4. Return shaped payload |
| | """ |
| | from app.mapper import canonify_df |
| | from app.tasks.kpi_logger import log_kpis_and_purge |
| |
|
| | df = canonify_df(org_id) |
| | if df.empty: |
| | return {"error": "No recent data found"} |
| |
|
| | data = df.to_dict("records") |
| | industry, _ = detect_industry(df) |
| |
|
| | match analytic_type: |
| | case "eda": |
| | result = await eda(data, industry) |
| | case "forecast": |
| | result = await forecast(data, kwargs["date_col"], kwargs["value_col"]) |
| | case "basket": |
| | result = await basket(data, 0.01, 0.3, 1.0) |
| | case "market-dynamics": |
| | result = await market_dynamics(data) |
| | case "supply-chain": |
| | result = await supply_chain(data) |
| | case "customer-insights": |
| | result = await customer_insights(data) |
| | case "operational-efficiency": |
| | result = await operational_efficiency(data) |
| | case "risk-assessment": |
| | result = await risk_assessment(data) |
| | case "sustainability": |
| | result = await sustainability(data) |
| | case _: |
| | return {"error": "Unknown analytic"} |
| |
|
| | |
| | log_kpis_and_purge(org_id) |
| | |
| | async with aiohttp.ClientSession() as session: |
| | await session.post( |
| | f"{os.getenv('NEXT_PUBLIC_ORIGIN')}/analytics/report/sync", |
| | json={ |
| | "orgId": org_id, |
| | "type": analytic_type, |
| | "results": result, |
| | "lastRun": datetime.utcnow().isoformat(), |
| | }, |
| | headers={"x-api-key": os.getenv("ANALYTICS_KEY")}, |
| | ) |
| | |
| | pdf_url = f"{os.getenv('PUBLIC_URL', '')}/api/reports/{org_id}/{analytic_type}.pdf" |
| | asyncio.create_task(send_pdf_email(org_id, f"{analytic_type} report", {"pdf": pdf_url, "data": result})) |
| |
|
| | return {"orgId": org_id, "analytic": analytic_type, "industry": industry, "results": result, "timestamp": datetime.utcnow().isoformat()} |
| |
|
| | |
| | |
| | |
| | def add_job_to_scheduler(schedule: dict): |
| | org_id = schedule["orgId"] |
| | freq = schedule["frequency"] |
| | analytics = schedule["analytics"] |
| | for analytic in analytics: |
| | job_id = f"{schedule['id']}_{analytic}" |
| | if freq == "daily": |
| | sched.add_job(run_analytic_job, "cron", hour=6, minute=0, |
| | args=[org_id, analytic], id=job_id) |
| | elif freq == "weekly": |
| | sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, |
| | args=[org_id, analytic], id=job_id) |
| | elif freq == "monthly": |
| | sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, |
| | args=[org_id, analytic], id=job_id) |
| |
|
| | def remove_job_from_scheduler(schedule_id: str): |
| | for job in sched.get_jobs(): |
| | if job.id.startswith(schedule_id): |
| | sched.remove_job(job.id) |
| |
|
| | |
| | |
| | |
| | async def load_schedules(): |
| | import json |
| | raw = os.getenv("SCHEDULES", "[]") |
| | try: |
| | schedules = json.loads(raw) |
| | except Exception: |
| | schedules = [] |
| |
|
| | for sch in schedules: |
| | org_id = sch["orgId"] |
| | freq = sch.get("frequency", "daily") |
| | analytics = sch.get("analytics", ["eda"]) |
| |
|
| | for analytic in analytics: |
| | job_id = f"{org_id}_{analytic}" |
| | if freq == "daily": |
| | sched.add_job(run_analytic_job, "cron", hour=6, minute=0, args=[org_id, analytic], id=job_id) |
| | elif freq == "weekly": |
| | sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, args=[org_id, analytic], id=job_id) |
| | elif freq == "monthly": |
| | sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, args=[org_id, analytic], id=job_id) |
| |
|
| | |
| | |
| | |
| | def start_scheduler(): |
| | asyncio.create_task(load_schedules()) |
| | sched.start() |