Peter Mutwiri
Clean snapshot
472833f
"""
Analytics engine routes – stateless, DuckDB-backed, any-shape input.
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import pandas as pd
from app.mapper import canonify_df # NEW
from app.engine.analytics import AnalyticsService
from app.utils.detect_industry import detect_industry
from app.service.industry_svc import (
eda, forecast, basket, market_dynamics, supply_chain,
customer_insights, operational_efficiency, risk_assessment, sustainability
)
router = APIRouter(prefix="/analytics", tags=["Analytics"])
class RunAnalyticIn(BaseModel):
analytic: str
dateColumn: str | None = None
valueColumn: str | None = None
minSupport: float = 0.01
minConfidence: float = 0.3
minLift: float = 1.0
@router.post("/run")
async def run_analytic(orgId: str, body: RunAnalyticIn):
"""
1. Pull last 6 h of raw rows (any column names)
2. Map -> canonical DataFrame
3. Run chosen analytic
4. Return shaped result
"""
df = canonify_df(orgId) # ← replaces pd.read_parquet
if df.empty:
raise HTTPException(404, "No recent data found – please ingest or stream first.")
industry, _ = detect_industry(df)
data = df.to_dict("records")
match body.analytic:
case "eda":
result = await eda(data, industry)
case "forecast":
if not body.dateColumn or not body.valueColumn:
raise HTTPException(400, "dateColumn & valueColumn required")
result = await forecast(data, body.dateColumn, body.valueColumn)
case "basket":
result = await basket(data, body.minSupport, body.minConfidence, body.minLift)
case "market-dynamics":
result = await market_dynamics(data)
case "supply-chain":
result = await supply_chain(data)
case "customer-insights":
result = await customer_insights(data)
case "operational-efficiency":
result = await operational_efficiency(data)
case "risk-assessment":
result = await risk_assessment(data)
case "sustainability":
result = await sustainability(data)
case _:
raise HTTPException(400, "Unknown analytic")
return {"industry": industry, "data": result}