File size: 5,689 Bytes
472833f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
State-less scheduler – caller (Next-js) orchestrates storage & quota.
Only duty: run analytics on cron, return JSON.
"""
import asyncio
import pandas as pd
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.engine.analytics import AnalyticsService
from app.service.industry_svc import (eda, forecast, basket, market_dynamics,
                                      supply_chain, customer_insights,
                                      operational_efficiency, risk_assessment,
                                      sustainability)
from app.utils.detect_industry import detect_industry
from app.utils.email import send_pdf_email
import os
from datetime import datetime
import aiohttp

sched = AsyncIOScheduler()

# ------------------------------------------------------------------
# 1  RUN ONE ANALYTIC – pure logic, no DB
# ------------------------------------------------------------------
async def run_analytic_job(org_id: str, analytic_type: str, **kwargs) -> dict:
    """
    1. Canonify last 6 h of raw rows (any column names) via DuckDB
    2. Compute chosen analytic
    3. Log KPIs + purge old raw data
    4. Return shaped payload
    """
    from app.mapper import canonify_df          # NEW: any-shape β†’ canonical
    from app.tasks.kpi_logger import log_kpis_and_purge  # NEW: history & tidy

    df = canonify_df(org_id)
    if df.empty:
        return {"error": "No recent data found"}

    data = df.to_dict("records")
    industry, _ = detect_industry(df)

    match analytic_type:
        case "eda":
            result = await eda(data, industry)
        case "forecast":
            result = await forecast(data, kwargs["date_col"], kwargs["value_col"])
        case "basket":
            result = await basket(data, 0.01, 0.3, 1.0)
        case "market-dynamics":
            result = await market_dynamics(data)
        case "supply-chain":
            result = await supply_chain(data)
        case "customer-insights":
            result = await customer_insights(data)
        case "operational-efficiency":
            result = await operational_efficiency(data)
        case "risk-assessment":
            result = await risk_assessment(data)
        case "sustainability":
            result = await sustainability(data)
        case _:
            return {"error": "Unknown analytic"}

    # ----------  NEW – history + disk tidy  ----------
    log_kpis_and_purge(org_id)          # inserts KPIs & deletes raw > 6 h
    # -------------------------------------------------
    async with aiohttp.ClientSession() as session:
        await session.post(
            f"{os.getenv('NEXT_PUBLIC_ORIGIN')}/analytics/report/sync",
            json={
                "orgId": org_id,
                "type": analytic_type,
                "results": result,
                "lastRun": datetime.utcnow().isoformat(),
            },
            headers={"x-api-key": os.getenv("ANALYTICS_KEY")},
        )
    # fire-and-forget email (caller decides storage)
    pdf_url = f"{os.getenv('PUBLIC_URL', '')}/api/reports/{org_id}/{analytic_type}.pdf"
    asyncio.create_task(send_pdf_email(org_id, f"{analytic_type} report", {"pdf": pdf_url, "data": result}))

    return {"orgId": org_id, "analytic": analytic_type, "industry": industry, "results": result, "timestamp": datetime.utcnow().isoformat()}

# ------------------------------------------------------------------
# 2  APScheduler glue – unchanged
# ------------------------------------------------------------------
def add_job_to_scheduler(schedule: dict):
    org_id    = schedule["orgId"]
    freq      = schedule["frequency"]
    analytics = schedule["analytics"]
    for analytic in analytics:
        job_id = f"{schedule['id']}_{analytic}"
        if freq == "daily":
            sched.add_job(run_analytic_job, "cron", hour=6, minute=0,
                          args=[org_id, analytic], id=job_id)
        elif freq == "weekly":
            sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0,
                          args=[org_id, analytic], id=job_id)
        elif freq == "monthly":
            sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0,
                          args=[org_id, analytic], id=job_id)

def remove_job_from_scheduler(schedule_id: str):
    for job in sched.get_jobs():
        if job.id.startswith(schedule_id):
            sched.remove_job(job.id)

# ------------------------------------------------------------------
# 3  ENV-loader – unchanged
# ------------------------------------------------------------------
async def load_schedules():
    import json
    raw = os.getenv("SCHEDULES", "[]")
    try:
        schedules = json.loads(raw)
    except Exception:
        schedules = []

    for sch in schedules:
        org_id = sch["orgId"]
        freq = sch.get("frequency", "daily")
        analytics = sch.get("analytics", ["eda"])

        for analytic in analytics:
            job_id = f"{org_id}_{analytic}"
            if freq == "daily":
                sched.add_job(run_analytic_job, "cron", hour=6, minute=0, args=[org_id, analytic], id=job_id)
            elif freq == "weekly":
                sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, args=[org_id, analytic], id=job_id)
            elif freq == "monthly":
                sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, args=[org_id, analytic], id=job_id)

# ------------------------------------------------------------------
# 4  STARTER
# ------------------------------------------------------------------
def start_scheduler():
    asyncio.create_task(load_schedules())
    sched.start()