Spaces:
Paused
Paused
| """ | |
| State-less scheduler β caller (Next-js) orchestrates storage & quota. | |
| Only duty: run analytics on cron, return JSON. | |
| """ | |
| import asyncio | |
| import pandas as pd | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| from app.engine.analytics import AnalyticsService | |
| from app.service.industry_svc import (eda, forecast, basket, market_dynamics, | |
| supply_chain, customer_insights, | |
| operational_efficiency, risk_assessment, | |
| sustainability) | |
| from app.utils.detect_industry import detect_industry | |
| from app.utils.email import send_pdf_email | |
| import os | |
| from datetime import datetime | |
| import aiohttp | |
| sched = AsyncIOScheduler() | |
| # ------------------------------------------------------------------ | |
| # 1 RUN ONE ANALYTIC β pure logic, no DB | |
| # ------------------------------------------------------------------ | |
| async def run_analytic_job(org_id: str, analytic_type: str, **kwargs) -> dict: | |
| """ | |
| 1. Canonify last 6 h of raw rows (any column names) via DuckDB | |
| 2. Compute chosen analytic | |
| 3. Log KPIs + purge old raw data | |
| 4. Return shaped payload | |
| """ | |
| from app.mapper import canonify_df # NEW: any-shape β canonical | |
| from app.tasks.kpi_logger import log_kpis_and_purge # NEW: history & tidy | |
| df = canonify_df(org_id) | |
| if df.empty: | |
| return {"error": "No recent data found"} | |
| data = df.to_dict("records") | |
| industry, _ = detect_industry(df) | |
| match analytic_type: | |
| case "eda": | |
| result = await eda(data, industry) | |
| case "forecast": | |
| result = await forecast(data, kwargs["date_col"], kwargs["value_col"]) | |
| case "basket": | |
| result = await basket(data, 0.01, 0.3, 1.0) | |
| case "market-dynamics": | |
| result = await market_dynamics(data) | |
| case "supply-chain": | |
| result = await supply_chain(data) | |
| case "customer-insights": | |
| result = await customer_insights(data) | |
| case "operational-efficiency": | |
| result = await operational_efficiency(data) | |
| case "risk-assessment": | |
| result = await risk_assessment(data) | |
| case "sustainability": | |
| result = await sustainability(data) | |
| case _: | |
| return {"error": "Unknown analytic"} | |
| # ---------- NEW β history + disk tidy ---------- | |
| log_kpis_and_purge(org_id) # inserts KPIs & deletes raw > 6 h | |
| # ------------------------------------------------- | |
| async with aiohttp.ClientSession() as session: | |
| await session.post( | |
| f"{os.getenv('NEXT_PUBLIC_ORIGIN')}/analytics/report/sync", | |
| json={ | |
| "orgId": org_id, | |
| "type": analytic_type, | |
| "results": result, | |
| "lastRun": datetime.utcnow().isoformat(), | |
| }, | |
| headers={"x-api-key": os.getenv("ANALYTICS_KEY")}, | |
| ) | |
| # fire-and-forget email (caller decides storage) | |
| pdf_url = f"{os.getenv('PUBLIC_URL', '')}/api/reports/{org_id}/{analytic_type}.pdf" | |
| asyncio.create_task(send_pdf_email(org_id, f"{analytic_type} report", {"pdf": pdf_url, "data": result})) | |
| return {"orgId": org_id, "analytic": analytic_type, "industry": industry, "results": result, "timestamp": datetime.utcnow().isoformat()} | |
| # ------------------------------------------------------------------ | |
| # 2 APScheduler glue β unchanged | |
| # ------------------------------------------------------------------ | |
| def add_job_to_scheduler(schedule: dict): | |
| org_id = schedule["orgId"] | |
| freq = schedule["frequency"] | |
| analytics = schedule["analytics"] | |
| for analytic in analytics: | |
| job_id = f"{schedule['id']}_{analytic}" | |
| if freq == "daily": | |
| sched.add_job(run_analytic_job, "cron", hour=6, minute=0, | |
| args=[org_id, analytic], id=job_id) | |
| elif freq == "weekly": | |
| sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, | |
| args=[org_id, analytic], id=job_id) | |
| elif freq == "monthly": | |
| sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, | |
| args=[org_id, analytic], id=job_id) | |
| def remove_job_from_scheduler(schedule_id: str): | |
| for job in sched.get_jobs(): | |
| if job.id.startswith(schedule_id): | |
| sched.remove_job(job.id) | |
| # ------------------------------------------------------------------ | |
| # 3 ENV-loader β unchanged | |
| # ------------------------------------------------------------------ | |
| async def load_schedules(): | |
| import json | |
| raw = os.getenv("SCHEDULES", "[]") | |
| try: | |
| schedules = json.loads(raw) | |
| except Exception: | |
| schedules = [] | |
| for sch in schedules: | |
| org_id = sch["orgId"] | |
| freq = sch.get("frequency", "daily") | |
| analytics = sch.get("analytics", ["eda"]) | |
| for analytic in analytics: | |
| job_id = f"{org_id}_{analytic}" | |
| if freq == "daily": | |
| sched.add_job(run_analytic_job, "cron", hour=6, minute=0, args=[org_id, analytic], id=job_id) | |
| elif freq == "weekly": | |
| sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, args=[org_id, analytic], id=job_id) | |
| elif freq == "monthly": | |
| sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, args=[org_id, analytic], id=job_id) | |
| # ------------------------------------------------------------------ | |
| # 4 STARTER | |
| # ------------------------------------------------------------------ | |
| def start_scheduler(): | |
| asyncio.create_task(load_schedules()) | |
| sched.start() |