Mutsynchub / app /routers /analytics_stream.py
shaliz-kong
Initial commit: self-hosted Redis, DuckDB, Analytics Engine
98a466d
# app/routers/analytics_stream.py
from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, Body, Depends
from typing import List, Dict
from datetime import datetime
import logging
from app.deps import verify_api_key
from app.core.event_hub import event_hub
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"])
class AnalyticsStreamManager:
"""Manages Redis streams for real-time analytics without WebSockets"""
def __init__(self, org_id: str, source_id: str):
self.org_id = org_id
self.source_id = source_id
self.stream_key = f"stream:analytics:{org_id}:{source_id}"
self.consumer_group = f"analytics_consumers_{org_id}"
async def ensure_consumer_group(self):
"""Create Redis consumer group if not exists"""
try:
event_hub.ensure_consumer_group(self.stream_key, self.consumer_group)
except Exception as e:
if "BUSYGROUP" not in str(e):
print(f"[stream] ⚠️ Group creation warning: {e}")
async def publish_kpi_update(self, data: Dict):
"""Publish KPI update to Redis stream"""
message = {
"type": "kpi_update",
"timestamp": datetime.utcnow().isoformat(),
"data": data
}
event_hub.emit_kpi_update(self.org_id, self.source_id, data)
async def publish_insight(self, insight: Dict):
"""Publish AI insight to stream"""
message = {
"type": "insight",
"timestamp": datetime.utcnow().isoformat(),
"data": insight
}
event_hub.emit_insight(self.org_id, self.source_id, insight)
def read_recent(self, count: int = 10) -> List[Dict]:
"""Read recent messages for polling"""
try:
return event_hub.read_recent_stream(self.stream_key, count)
except Exception as e:
print(f"[stream] ❌ Read error: {e}")
return []
@router.get("/recent")
async def get_recent_analytics(
count: int = Query(10, ge=1, le=100),
org_id: str = Query(..., description="Organization ID"),
source_id: str = Query(..., description="Data source ID"),
api_key: str = Depends(verify_api_key)
):
"""poll recent analytics from the event hub"""
if not org_id:
raise HTTPException(status_code=400, detail="org_id required")
# use the hub to get events
events = event_hub.get_recent_events(org_id, source_id, count)
# filter and format for frontend
messages = []
for event in events:
if event["event_type"] == "kpi_update":
messages.append({
"type": "kpi_update",
"timestamp": event["timestamp"],
"data": event["data"]
})
elif event["event_type"] == "insight":
messages.append({
"type": "insight",
"timestamp": event["timestamp"],
"data": event["data"]
})
return {
"status": "success",
"org_id": org_id,
"source_id": source_id,
"messages": messages,
"timestamp": datetime.utcnow().isoformat()
}
# app/routers/analytics_stream.py
# βœ… Add imports
@router.post("/callback")
async def qstash_kpi_callback(
background_tasks: BackgroundTasks, # βœ… First (no default)
payload: Dict = Body(...), # βœ… Second (has default)
):
"""QStash calls this to compute KPIs"""
org_id = payload["org_id"]
source_id = payload["source_id"]
# Trigger background computation
background_tasks.add_task(run_analytics_worker, org_id, source_id)
return {"status": "accepted"}
@router.post("/notify")
async def qstash_notification(payload: Dict = Body(...)):
"""QStash calls this when job is done"""
# This is where you notify frontend
# Could ping a webhook or update a status key in Redis
return {"status": "ok"}
async def run_analytics_worker(org_id: str, source_id: str):
"""Run the KPI worker and publish results"""
try:
from app.tasks.analytics_worker import AnalyticsWorker
worker = AnalyticsWorker(org_id, source_id)
results = await worker.run()
# Publish via central hub
event_hub.emit_kpi_update(org_id, source_id, results)
except Exception as e:
print(f"[callback] ❌ Worker failed: {e}")