analytics-engine / app /routers /reports.py
Peter Mutwiri
Clean snapshot
472833f
"""
Analytics engine routes – DuckDB-backed, any-shape input.
Also exposes Neon-bridge endpoints so Next.js (Prisma) can store history.
"""
from fastapi import APIRouter, Query, HTTPException
from pydantic import BaseModel
from datetime import datetime
import json
from app.mapper import canonify_df
from app.engine.analytics import AnalyticsService
from app.utils.detect_industry import detect_industry
from app.service.industry_svc import (
eda, forecast, basket, market_dynamics, supply_chain,
customer_insights, operational_efficiency, risk_assessment, sustainability
)
router = APIRouter(prefix="/analytics", tags=["Analytics"])
analytics = AnalyticsService()
# --------------------------------------------------
# 1 RUN ANALYTIC – real-time, any column names
# --------------------------------------------------
class RunAnalyticIn(BaseModel):
analytic: str
dateColumn: str | None = None
valueColumn: str | None = None
minSupport: float = 0.01
minConfidence: float = 0.3
minLift: float = 1.0
@router.post("/run")
async def run_analytic(orgId: str, body: RunAnalyticIn):
"""
1. Canonify last 6 h of raw rows (any shape)
2. Compute chosen analytic
3. Return shaped payload
"""
df = canonify_df(orgId)
if df.empty:
raise HTTPException(404, "No recent data found – please ingest or stream first.")
data = df.to_dict("records")
industry, _ = detect_industry(df)
match body.analytic:
case "eda":
result = await eda(data, industry)
case "forecast":
if not body.dateColumn or not body.valueColumn:
raise HTTPException(400, "dateColumn & valueColumn required")
result = await forecast(data, body.dateColumn, body.valueColumn)
case "basket":
result = await basket(data, body.minSupport, body.minConfidence, body.minLift)
case "market-dynamics":
result = await market_dynamics(data)
case "supply-chain":
result = await supply_chain(data)
case "customer-insights":
result = await customer_insights(data)
case "operational-efficiency":
result = await operational_efficiency(data)
case "risk-assessment":
result = await risk_assessment(data)
case "sustainability":
result = await sustainability(data)
case _:
raise HTTPException(400, "Unknown analytic")
return {"industry": industry, "data": result}
# --------------------------------------------------
# 2 NEON BRIDGE – latest report for UI + push endpoint
# --------------------------------------------------
class PushReportIn(BaseModel):
orgId: str
type: str
results: dict
lastRun: datetime
@router.get("/report/latest")
def latest_report(orgId: str = Query(...)):
"""
Returns the newest KPI snapshot we have for this org
(shape matches Neon schema so Next.js can forward 1-to-1)
"""
from app.db import get_conn
conn = get_conn(orgId)
row = conn.execute("""
SELECT analytic_type, results, ts
FROM kpi_log
WHERE org_id = ?
ORDER BY ts DESC
LIMIT 1
""", [orgId]).fetchone()
conn.close()
if not row:
raise HTTPException(404, "No report yet")
return {
"orgId": orgId,
"type": row[0],
"results": json.loads(row[1]) if isinstance(row[1], str) else row[1],
"lastRun": row[2].isoformat(),
}
@router.post("/report/push")
async def push_report(body: PushReportIn):
"""
Internal endpoint – Next.js (Prisma) calls this to store history in Neon.
Analytics container itself does **not** touch Prisma.
"""
# optional: validate signature / api-key here if you want
return {"status": "accepted", "orgId": body.orgId, "type": body.type}