Spaces:
Paused
Paused
File size: 4,468 Bytes
98a466d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | # app/routers/analytics_stream.py
from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, Body, Depends
from typing import List, Dict
from datetime import datetime
import logging
from app.deps import verify_api_key
from app.core.event_hub import event_hub
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"])
class AnalyticsStreamManager:
"""Manages Redis streams for real-time analytics without WebSockets"""
def __init__(self, org_id: str, source_id: str):
self.org_id = org_id
self.source_id = source_id
self.stream_key = f"stream:analytics:{org_id}:{source_id}"
self.consumer_group = f"analytics_consumers_{org_id}"
async def ensure_consumer_group(self):
"""Create Redis consumer group if not exists"""
try:
event_hub.ensure_consumer_group(self.stream_key, self.consumer_group)
except Exception as e:
if "BUSYGROUP" not in str(e):
print(f"[stream] ⚠️ Group creation warning: {e}")
async def publish_kpi_update(self, data: Dict):
"""Publish KPI update to Redis stream"""
message = {
"type": "kpi_update",
"timestamp": datetime.utcnow().isoformat(),
"data": data
}
event_hub.emit_kpi_update(self.org_id, self.source_id, data)
async def publish_insight(self, insight: Dict):
"""Publish AI insight to stream"""
message = {
"type": "insight",
"timestamp": datetime.utcnow().isoformat(),
"data": insight
}
event_hub.emit_insight(self.org_id, self.source_id, insight)
def read_recent(self, count: int = 10) -> List[Dict]:
"""Read recent messages for polling"""
try:
return event_hub.read_recent_stream(self.stream_key, count)
except Exception as e:
print(f"[stream] ❌ Read error: {e}")
return []
@router.get("/recent")
async def get_recent_analytics(
count: int = Query(10, ge=1, le=100),
org_id: str = Query(..., description="Organization ID"),
source_id: str = Query(..., description="Data source ID"),
api_key: str = Depends(verify_api_key)
):
"""poll recent analytics from the event hub"""
if not org_id:
raise HTTPException(status_code=400, detail="org_id required")
# use the hub to get events
events = event_hub.get_recent_events(org_id, source_id, count)
# filter and format for frontend
messages = []
for event in events:
if event["event_type"] == "kpi_update":
messages.append({
"type": "kpi_update",
"timestamp": event["timestamp"],
"data": event["data"]
})
elif event["event_type"] == "insight":
messages.append({
"type": "insight",
"timestamp": event["timestamp"],
"data": event["data"]
})
return {
"status": "success",
"org_id": org_id,
"source_id": source_id,
"messages": messages,
"timestamp": datetime.utcnow().isoformat()
}
# app/routers/analytics_stream.py
# ✅ Add imports
@router.post("/callback")
async def qstash_kpi_callback(
background_tasks: BackgroundTasks, # ✅ First (no default)
payload: Dict = Body(...), # ✅ Second (has default)
):
"""QStash calls this to compute KPIs"""
org_id = payload["org_id"]
source_id = payload["source_id"]
# Trigger background computation
background_tasks.add_task(run_analytics_worker, org_id, source_id)
return {"status": "accepted"}
@router.post("/notify")
async def qstash_notification(payload: Dict = Body(...)):
"""QStash calls this when job is done"""
# This is where you notify frontend
# Could ping a webhook or update a status key in Redis
return {"status": "ok"}
async def run_analytics_worker(org_id: str, source_id: str):
"""Run the KPI worker and publish results"""
try:
from app.tasks.analytics_worker import AnalyticsWorker
worker = AnalyticsWorker(org_id, source_id)
results = await worker.run()
# Publish via central hub
event_hub.emit_kpi_update(org_id, source_id, results)
except Exception as e:
print(f"[callback] ❌ Worker failed: {e}") |