File size: 4,468 Bytes
98a466d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# app/routers/analytics_stream.py
from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, Body, Depends
from typing import List, Dict
from datetime import datetime
import logging
from app.deps import verify_api_key
from app.core.event_hub import event_hub
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"])

class AnalyticsStreamManager:
    """Manages Redis streams for real-time analytics without WebSockets"""
    
    def __init__(self, org_id: str, source_id: str):
        self.org_id = org_id
        self.source_id = source_id
        self.stream_key = f"stream:analytics:{org_id}:{source_id}"
        self.consumer_group = f"analytics_consumers_{org_id}"
        
    async def ensure_consumer_group(self):
        """Create Redis consumer group if not exists"""
        try:
            event_hub.ensure_consumer_group(self.stream_key, self.consumer_group)
        except Exception as e:
            if "BUSYGROUP" not in str(e):
                print(f"[stream] ⚠️ Group creation warning: {e}")
    
    async def publish_kpi_update(self, data: Dict):
        """Publish KPI update to Redis stream"""
        message = {
            "type": "kpi_update",
            "timestamp": datetime.utcnow().isoformat(),
            "data": data
        }
        event_hub.emit_kpi_update(self.org_id, self.source_id, data)
    
    async def publish_insight(self, insight: Dict):
        """Publish AI insight to stream"""
        message = {
            "type": "insight",
            "timestamp": datetime.utcnow().isoformat(),
            "data": insight
        }
        event_hub.emit_insight(self.org_id, self.source_id, insight)
    
    def read_recent(self, count: int = 10) -> List[Dict]:
        """Read recent messages for polling"""
        try:
            return event_hub.read_recent_stream(self.stream_key, count)
        except Exception as e:
            print(f"[stream] ❌ Read error: {e}")
            return []

@router.get("/recent")
async def get_recent_analytics(
    count: int = Query(10, ge=1, le=100),
    org_id: str = Query(..., description="Organization ID"),
    source_id: str = Query(..., description="Data source ID"),
    api_key: str = Depends(verify_api_key)
):
    """poll recent analytics from the event hub"""
    if not org_id:
        raise HTTPException(status_code=400, detail="org_id required")
    
    # use the hub to get events
    events = event_hub.get_recent_events(org_id, source_id, count)
    
    # filter and format for frontend
    messages = []
    for event in events:
        if event["event_type"] == "kpi_update":
            messages.append({
                "type": "kpi_update",
                "timestamp": event["timestamp"],
                "data": event["data"]
            })
        elif event["event_type"] == "insight":
            messages.append({
                "type": "insight",
                "timestamp": event["timestamp"],
                "data": event["data"]
            })
    
    return {
        "status": "success",
        "org_id": org_id,
        "source_id": source_id,
        "messages": messages,
        "timestamp": datetime.utcnow().isoformat()
    }



# app/routers/analytics_stream.py
 # ✅ Add imports

@router.post("/callback")
async def qstash_kpi_callback(
    background_tasks: BackgroundTasks,  # ✅ First (no default)
    payload: Dict = Body(...),  # ✅ Second (has default)
):
    """QStash calls this to compute KPIs"""
    org_id = payload["org_id"]
    source_id = payload["source_id"]
    
    # Trigger background computation
    background_tasks.add_task(run_analytics_worker, org_id, source_id)
    
    return {"status": "accepted"}

@router.post("/notify")
async def qstash_notification(payload: Dict = Body(...)):
    """QStash calls this when job is done"""
    # This is where you notify frontend
    # Could ping a webhook or update a status key in Redis
    
    return {"status": "ok"}

async def run_analytics_worker(org_id: str, source_id: str):
    """Run the KPI worker and publish results"""
    try:
        from app.tasks.analytics_worker import AnalyticsWorker
        worker = AnalyticsWorker(org_id, source_id)
        results = await worker.run()
        
        # Publish via central hub
        event_hub.emit_kpi_update(org_id, source_id, results)
        
    except Exception as e:
        print(f"[callback] ❌ Worker failed: {e}")