| | |
| | from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, Body, Depends |
| | from typing import List, Dict |
| | from datetime import datetime |
| | import logging |
| | from app.deps import verify_api_key |
| | from app.core.event_hub import event_hub |
| | logger = logging.getLogger(__name__) |
| | router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"]) |
| |
|
| | class AnalyticsStreamManager: |
| | """Manages Redis streams for real-time analytics without WebSockets""" |
| | |
| | def __init__(self, org_id: str, source_id: str): |
| | self.org_id = org_id |
| | self.source_id = source_id |
| | self.stream_key = f"stream:analytics:{org_id}:{source_id}" |
| | self.consumer_group = f"analytics_consumers_{org_id}" |
| | |
| | async def ensure_consumer_group(self): |
| | """Create Redis consumer group if not exists""" |
| | try: |
| | event_hub.ensure_consumer_group(self.stream_key, self.consumer_group) |
| | except Exception as e: |
| | if "BUSYGROUP" not in str(e): |
| | print(f"[stream] β οΈ Group creation warning: {e}") |
| | |
| | async def publish_kpi_update(self, data: Dict): |
| | """Publish KPI update to Redis stream""" |
| | message = { |
| | "type": "kpi_update", |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "data": data |
| | } |
| | event_hub.emit_kpi_update(self.org_id, self.source_id, data) |
| | |
| | async def publish_insight(self, insight: Dict): |
| | """Publish AI insight to stream""" |
| | message = { |
| | "type": "insight", |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "data": insight |
| | } |
| | event_hub.emit_insight(self.org_id, self.source_id, insight) |
| | |
| | def read_recent(self, count: int = 10) -> List[Dict]: |
| | """Read recent messages for polling""" |
| | try: |
| | return event_hub.read_recent_stream(self.stream_key, count) |
| | except Exception as e: |
| | print(f"[stream] β Read error: {e}") |
| | return [] |
| |
|
| | @router.get("/recent") |
| | async def get_recent_analytics( |
| | count: int = Query(10, ge=1, le=100), |
| | org_id: str = Query(..., description="Organization ID"), |
| | source_id: str = Query(..., description="Data source ID"), |
| | api_key: str = Depends(verify_api_key) |
| | ): |
| | """poll recent analytics from the event hub""" |
| | if not org_id: |
| | raise HTTPException(status_code=400, detail="org_id required") |
| | |
| | |
| | events = event_hub.get_recent_events(org_id, source_id, count) |
| | |
| | |
| | messages = [] |
| | for event in events: |
| | if event["event_type"] == "kpi_update": |
| | messages.append({ |
| | "type": "kpi_update", |
| | "timestamp": event["timestamp"], |
| | "data": event["data"] |
| | }) |
| | elif event["event_type"] == "insight": |
| | messages.append({ |
| | "type": "insight", |
| | "timestamp": event["timestamp"], |
| | "data": event["data"] |
| | }) |
| | |
| | return { |
| | "status": "success", |
| | "org_id": org_id, |
| | "source_id": source_id, |
| | "messages": messages, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| |
|
| |
|
| |
|
| | |
| | |
| |
|
| | @router.post("/callback") |
| | async def qstash_kpi_callback( |
| | background_tasks: BackgroundTasks, |
| | payload: Dict = Body(...), |
| | ): |
| | """QStash calls this to compute KPIs""" |
| | org_id = payload["org_id"] |
| | source_id = payload["source_id"] |
| | |
| | |
| | background_tasks.add_task(run_analytics_worker, org_id, source_id) |
| | |
| | return {"status": "accepted"} |
| |
|
| | @router.post("/notify") |
| | async def qstash_notification(payload: Dict = Body(...)): |
| | """QStash calls this when job is done""" |
| | |
| | |
| | |
| | return {"status": "ok"} |
| |
|
| | async def run_analytics_worker(org_id: str, source_id: str): |
| | """Run the KPI worker and publish results""" |
| | try: |
| | from app.tasks.analytics_worker import AnalyticsWorker |
| | worker = AnalyticsWorker(org_id, source_id) |
| | results = await worker.run() |
| | |
| | |
| | event_hub.emit_kpi_update(org_id, source_id, results) |
| | |
| | except Exception as e: |
| | print(f"[callback] β Worker failed: {e}") |