Spaces:
Paused
Paused
| # app/routers/analytics_stream.py | |
| from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, Body, Depends | |
| from typing import List, Dict | |
| from datetime import datetime | |
| import logging | |
| from app.deps import verify_api_key | |
| from app.core.event_hub import event_hub | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"]) | |
| class AnalyticsStreamManager: | |
| """Manages Redis streams for real-time analytics without WebSockets""" | |
| def __init__(self, org_id: str, source_id: str): | |
| self.org_id = org_id | |
| self.source_id = source_id | |
| self.stream_key = f"stream:analytics:{org_id}:{source_id}" | |
| self.consumer_group = f"analytics_consumers_{org_id}" | |
| async def ensure_consumer_group(self): | |
| """Create Redis consumer group if not exists""" | |
| try: | |
| event_hub.ensure_consumer_group(self.stream_key, self.consumer_group) | |
| except Exception as e: | |
| if "BUSYGROUP" not in str(e): | |
| print(f"[stream] β οΈ Group creation warning: {e}") | |
| async def publish_kpi_update(self, data: Dict): | |
| """Publish KPI update to Redis stream""" | |
| message = { | |
| "type": "kpi_update", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "data": data | |
| } | |
| event_hub.emit_kpi_update(self.org_id, self.source_id, data) | |
| async def publish_insight(self, insight: Dict): | |
| """Publish AI insight to stream""" | |
| message = { | |
| "type": "insight", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "data": insight | |
| } | |
| event_hub.emit_insight(self.org_id, self.source_id, insight) | |
| def read_recent(self, count: int = 10) -> List[Dict]: | |
| """Read recent messages for polling""" | |
| try: | |
| return event_hub.read_recent_stream(self.stream_key, count) | |
| except Exception as e: | |
| print(f"[stream] β Read error: {e}") | |
| return [] | |
| async def get_recent_analytics( | |
| count: int = Query(10, ge=1, le=100), | |
| org_id: str = Query(..., description="Organization ID"), | |
| source_id: str = Query(..., description="Data source ID"), | |
| api_key: str = Depends(verify_api_key) | |
| ): | |
| """poll recent analytics from the event hub""" | |
| if not org_id: | |
| raise HTTPException(status_code=400, detail="org_id required") | |
| # use the hub to get events | |
| events = event_hub.get_recent_events(org_id, source_id, count) | |
| # filter and format for frontend | |
| messages = [] | |
| for event in events: | |
| if event["event_type"] == "kpi_update": | |
| messages.append({ | |
| "type": "kpi_update", | |
| "timestamp": event["timestamp"], | |
| "data": event["data"] | |
| }) | |
| elif event["event_type"] == "insight": | |
| messages.append({ | |
| "type": "insight", | |
| "timestamp": event["timestamp"], | |
| "data": event["data"] | |
| }) | |
| return { | |
| "status": "success", | |
| "org_id": org_id, | |
| "source_id": source_id, | |
| "messages": messages, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # app/routers/analytics_stream.py | |
| # β Add imports | |
| async def qstash_kpi_callback( | |
| background_tasks: BackgroundTasks, # β First (no default) | |
| payload: Dict = Body(...), # β Second (has default) | |
| ): | |
| """QStash calls this to compute KPIs""" | |
| org_id = payload["org_id"] | |
| source_id = payload["source_id"] | |
| # Trigger background computation | |
| background_tasks.add_task(run_analytics_worker, org_id, source_id) | |
| return {"status": "accepted"} | |
| async def qstash_notification(payload: Dict = Body(...)): | |
| """QStash calls this when job is done""" | |
| # This is where you notify frontend | |
| # Could ping a webhook or update a status key in Redis | |
| return {"status": "ok"} | |
| async def run_analytics_worker(org_id: str, source_id: str): | |
| """Run the KPI worker and publish results""" | |
| try: | |
| from app.tasks.analytics_worker import AnalyticsWorker | |
| worker = AnalyticsWorker(org_id, source_id) | |
| results = await worker.run() | |
| # Publish via central hub | |
| event_hub.emit_kpi_update(org_id, source_id, results) | |
| except Exception as e: | |
| print(f"[callback] β Worker failed: {e}") |