Peter Mutwiri commited on
Commit
b143861
·
1 Parent(s): e52a29f

added loging on datasource and removed kpi computation trigger

Browse files
Files changed (2) hide show
  1. .vscode/settings.json +12 -2
  2. app/routers/datasources.py +3 -33
.vscode/settings.json CHANGED
@@ -1,4 +1,14 @@
1
  {
2
  "python-envs.defaultEnvManager": "ms-python.python:system",
3
- "python-envs.pythonProjects": []
4
- }
 
 
 
 
 
 
 
 
 
 
 
1
  {
2
  "python-envs.defaultEnvManager": "ms-python.python:system",
3
+ "python-envs.pythonProjects": [],
4
+
5
+ "python.linting.enabled": true,
6
+ "python.linting.ruffEnabled": true,
7
+ "[python]": {
8
+ "editor.codeActionsOnSave": {
9
+ "source.fixAll.ruff": true
10
+ },
11
+ "editor.defaultFormatter": "charliermarsh.ruff"
12
+ }
13
+ }
14
+
app/routers/datasources.py CHANGED
@@ -3,7 +3,7 @@ from typing import Dict, Any, List, Union
3
  from fastapi.responses import JSONResponse
4
  from pydantic import BaseModel
5
  from typing import List, Any, Dict, Union
6
- from app.deps import verify_api_key,get_current_user,APP_URL
7
  from app.db import get_conn, ensure_raw_table, bootstrap
8
  from app.mapper import canonify_df
9
  from app.routers.socket import sio
@@ -12,9 +12,7 @@ import json
12
  import time
13
  from datetime import datetime, timedelta
14
  from app.redis_client import redis
15
- # Add this import
16
- from app.qstash_client import get_qstash_client, is_qstash_available
17
- # import logging
18
  logger = logging.getLogger(__name__)
19
 
20
  router = APIRouter(tags=["datasources"])
@@ -83,35 +81,7 @@ async def create_source_json(
83
 
84
  preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
85
 
86
- try:
87
- # Use APP_URL from deps (no need for settings)
88
- trigger_data = {
89
- "type": "kpi_compute",
90
- "org_id": org_id,
91
- "source_id": source_id
92
- }
93
-
94
- # Try QStash if available
95
- if is_qstash_available():
96
- try:
97
- from app.qstash_client import get_qstash_client
98
- client = get_qstash_client()
99
- result = client.message.publish(
100
- url=f"{APP_URL}/api/v1/analytics/callback",
101
- body=trigger_data
102
- )
103
- logger.info(f"📤 QStash queued: {result.message_id}")
104
- except Exception as e:
105
- logger.warning(f"⚠️ QStash failed, using Redis: {e}")
106
- redis.publish(f"analytics_trigger:{org_id}:{source_id}", json.dumps(trigger_data))
107
- else:
108
- # ✅ Fallback: Direct Redis publish
109
- redis.publish(f"analytics_trigger:{org_id}:{source_id}", json.dumps(trigger_data))
110
- logger.info("📡 Redis fallback for analytics trigger")
111
-
112
- except Exception as e:
113
- logger.warning(f"⚠️ Analytics trigger failed (non-critical): {e}")
114
- # ✅ DON'T raise - ingestion should succeed even if analytics fails
115
  # 4. 📡 Broadcast to connected dashboards
116
  await sio.emit(
117
  "datasource:new-rows",
 
3
  from fastapi.responses import JSONResponse
4
  from pydantic import BaseModel
5
  from typing import List, Any, Dict, Union
6
+ from app.deps import verify_api_key,get_current_user
7
  from app.db import get_conn, ensure_raw_table, bootstrap
8
  from app.mapper import canonify_df
9
  from app.routers.socket import sio
 
12
  import time
13
  from datetime import datetime, timedelta
14
  from app.redis_client import redis
15
+ import logging
 
 
16
  logger = logging.getLogger(__name__)
17
 
18
  router = APIRouter(tags=["datasources"])
 
81
 
82
  preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
83
 
84
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  # 4. 📡 Broadcast to connected dashboards
86
  await sio.emit(
87
  "datasource:new-rows",