Peter Mutwiri commited on
Commit Β·
81d1178
1
Parent(s): 35919f2
corrected logging
Browse files
app/routers/analytics_stream.py
CHANGED
|
@@ -8,7 +8,7 @@ from app.deps import get_current_user
|
|
| 8 |
from app.redis_client import redis
|
| 9 |
import uuid
|
| 10 |
from app.qstash_client import publish_message, is_qstash_available
|
| 11 |
-
|
| 12 |
router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"])
|
| 13 |
|
| 14 |
class AnalyticsStreamManager:
|
|
@@ -125,19 +125,20 @@ async def trigger_kpi_computation(
|
|
| 125 |
except Exception as e:
|
| 126 |
raise HTTPException(status_code=500, detail=f"QStash error: {str(e)}")
|
| 127 |
|
|
|
|
|
|
|
|
|
|
| 128 |
@router.post("/callback")
|
| 129 |
async def qstash_kpi_callback(
|
| 130 |
-
|
| 131 |
-
|
| 132 |
):
|
| 133 |
"""QStash calls this to compute KPIs"""
|
| 134 |
org_id = payload["org_id"]
|
| 135 |
source_id = payload["source_id"]
|
| 136 |
|
| 137 |
# Trigger background computation
|
| 138 |
-
background_tasks.add_task(
|
| 139 |
-
run_analytics_worker, org_id, source_id
|
| 140 |
-
)
|
| 141 |
|
| 142 |
return {"status": "accepted"}
|
| 143 |
|
|
|
|
| 8 |
from app.redis_client import redis
|
| 9 |
import uuid
|
| 10 |
from app.qstash_client import publish_message, is_qstash_available
|
| 11 |
+
from fastapi import BackgroundTasks, Body, Depends
|
| 12 |
router = APIRouter(prefix="/api/v1/analytics/stream", tags=["analytics"])
|
| 13 |
|
| 14 |
class AnalyticsStreamManager:
|
|
|
|
| 125 |
except Exception as e:
|
| 126 |
raise HTTPException(status_code=500, detail=f"QStash error: {str(e)}")
|
| 127 |
|
| 128 |
+
# app/routers/analytics_stream.py
|
| 129 |
+
from fastapi import BackgroundTasks, Body, Depends # β
Add imports
|
| 130 |
+
|
| 131 |
@router.post("/callback")
|
| 132 |
async def qstash_kpi_callback(
|
| 133 |
+
background_tasks: BackgroundTasks, # β
First (no default)
|
| 134 |
+
payload: Dict = Body(...), # β
Second (has default)
|
| 135 |
):
|
| 136 |
"""QStash calls this to compute KPIs"""
|
| 137 |
org_id = payload["org_id"]
|
| 138 |
source_id = payload["source_id"]
|
| 139 |
|
| 140 |
# Trigger background computation
|
| 141 |
+
background_tasks.add_task(run_analytics_worker, org_id, source_id)
|
|
|
|
|
|
|
| 142 |
|
| 143 |
return {"status": "accepted"}
|
| 144 |
|
app/routers/datasources.py
CHANGED
|
@@ -13,6 +13,7 @@ from datetime import datetime, timedelta
|
|
| 13 |
from app.redis_client import redis
|
| 14 |
# Add this import
|
| 15 |
from app.tasks.analytics_worker import trigger_kpi_computation
|
|
|
|
| 16 |
router = APIRouter(tags=["datasources"]) # Remove
|
| 17 |
|
| 18 |
|
|
@@ -79,7 +80,22 @@ async def create_source_json(
|
|
| 79 |
preview_df[col] = preview_df[col].astype(str)
|
| 80 |
|
| 81 |
preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
|
| 82 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 83 |
# 4. π‘ Broadcast to connected dashboards
|
| 84 |
await sio.emit(
|
| 85 |
"datasource:new-rows",
|
|
|
|
| 13 |
from app.redis_client import redis
|
| 14 |
# Add this import
|
| 15 |
from app.tasks.analytics_worker import trigger_kpi_computation
|
| 16 |
+
from app.qstash_client import get_qstash_client, is_qstash_available
|
| 17 |
router = APIRouter(tags=["datasources"]) # Remove
|
| 18 |
|
| 19 |
|
|
|
|
| 80 |
preview_df[col] = preview_df[col].astype(str)
|
| 81 |
|
| 82 |
preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
|
| 83 |
+
if is_qstash_available():
|
| 84 |
+
try:
|
| 85 |
+
client = get_qstash_client()
|
| 86 |
+
result = client.message.publish(
|
| 87 |
+
url=f"{settings.APP_URL}/api/v1/analytics/callback",
|
| 88 |
+
body={"org_id": org_id, "source_id": source_id}
|
| 89 |
+
)
|
| 90 |
+
logger.info(f"π€ QStash queued: {result.message_id}")
|
| 91 |
+
except Exception as e:
|
| 92 |
+
logger.warning(f"β οΈ QStash failed, using Redis: {e}")
|
| 93 |
+
# Fallback to Redis publish
|
| 94 |
+
redis.publish(...)
|
| 95 |
+
else:
|
| 96 |
+
# β
Fallback when QStash not configured
|
| 97 |
+
redis.publish(...)
|
| 98 |
+
logger.info("π‘ Redis fallback for analytics trigger")
|
| 99 |
# 4. π‘ Broadcast to connected dashboards
|
| 100 |
await sio.emit(
|
| 101 |
"datasource:new-rows",
|