Peter Mutwiri commited on
Commit
644db45
Β·
1 Parent(s): 81d1178

refactored analytics stream

Browse files
app/deps.py CHANGED
@@ -43,6 +43,8 @@ HF_API_TOKEN = get_secret("HF_API_TOKEN", required=False)
43
 
44
  # QStash Token (optional, for advanced queue features)
45
  QSTASH_TOKEN = get_secret("QSTASH_TOKEN", required=False)
 
 
46
 
47
  # ── Singleton Database Connections ──────────────────────────────────────────────
48
  _org_db_connections = {}
 
43
 
44
  # QStash Token (optional, for advanced queue features)
45
  QSTASH_TOKEN = get_secret("QSTASH_TOKEN", required=False)
46
+ # Application URL (where this HF Space is hosted)
47
+ APP_URL = get_secret("SPACE_HOST", required=True)
48
 
49
  # ── Singleton Database Connections ──────────────────────────────────────────────
50
  _org_db_connections = {}
app/main.py CHANGED
@@ -25,6 +25,7 @@ from app.tasks.analytics_worker import redis_listener, trigger_kpi_computation
25
  from app.service.vector_service import cleanup_expired_vectors
26
  from app.routers import health, datasources, reports, flags, scheduler, run, socket, analytics_stream,ai_query,schema
27
  from app.service.llm_service import load_llm_service
 
28
  # ─── Logger Configuration ───────────────────────────────────────────────────────
29
  logging.basicConfig(
30
  level=logging.INFO,
@@ -112,7 +113,12 @@ async def lifespan(app: FastAPI):
112
  logger.info("πŸ€– LLM service loading in background...")
113
  except Exception as e:
114
  logger.error(f"❌ LLM load failed: {e}")
115
- # Continue anyway - LLM is optional for some features
 
 
 
 
 
116
  yield
117
 
118
  # ─── Shutdown ──────────────────────────────────────────────────────────────
 
25
  from app.service.vector_service import cleanup_expired_vectors
26
  from app.routers import health, datasources, reports, flags, scheduler, run, socket, analytics_stream,ai_query,schema
27
  from app.service.llm_service import load_llm_service
28
+ from app.deps import get_qstash_client
29
  # ─── Logger Configuration ───────────────────────────────────────────────────────
30
  logging.basicConfig(
31
  level=logging.INFO,
 
113
  logger.info("πŸ€– LLM service loading in background...")
114
  except Exception as e:
115
  logger.error(f"❌ LLM load failed: {e}")
116
+ # Continue anyway - LLM is optional for some features
117
+ try:
118
+ get_qstash_client() # This creates the singleton if not exists
119
+ logger.info("βœ… QStash ready")
120
+ except RuntimeError as e:
121
+ logger.warning(f"⚠️ QStash disabled: {e}")
122
  yield
123
 
124
  # ─── Shutdown ──────────────────────────────────────────────────────────────
app/qstash_client.py CHANGED
@@ -1,69 +1,37 @@
1
  # app/qstash_client.py
2
- import os
3
  import logging
4
- from typing import Optional
 
5
 
6
  logger = logging.getLogger(__name__)
7
 
8
- # βœ… Module-level singleton (survives imports)
9
- _qstash_client: Optional["Client"] = None
10
-
11
- def init_qstash_client() -> bool:
12
  """
13
- EXPLICITLY initialize QStash client on startup.
14
- Call this AFTER HF secrets are loaded.
15
-
16
- Returns:
17
- bool: True if successful, False if QStash not configured
18
  """
19
- global _qstash_client
20
-
21
- if _qstash_client is not None:
22
- logger.info("βœ… QStash client already initialized")
23
- return True
24
-
25
- token = os.getenv("QSTASH_TOKEN")
26
- if not token:
27
- logger.warning("⚠️ QSTASH_TOKEN not found - QStash features disabled")
28
- return False
29
-
30
  try:
31
- from upstash_qstash import Client
32
-
33
- qstash_url = os.getenv("QSTASH_URL")
34
- if qstash_url:
35
- _qstash_client = Client(token=token, url=qstash_url)
36
- logger.info(f"βœ… QStash client initialized (custom URL: {qstash_url})")
37
- else:
38
- _qstash_client = Client(token=token)
39
- logger.info("βœ… QStash client initialized")
40
-
41
  return True
42
-
43
- except ImportError:
44
- logger.error("❌ upstash_qstash not installed. pip install upstash-qstash")
45
- return False
46
- except Exception as e:
47
- logger.error(f"❌ QStash initialization failed: {e}")
48
  return False
49
 
50
- def get_qstash_client() -> Optional["Client"]:
51
  """
52
- Get the singleton QStash client.
 
 
 
 
 
53
 
54
  Returns:
55
- Client instance or None if not initialized
56
 
57
  Raises:
58
- RuntimeError: If called before init_qstash_client()
59
  """
60
- if _qstash_client is None:
61
- raise RuntimeError(
62
- "QStash client not initialized. "
63
- "Call init_qstash_client() in startup sequence first."
64
- )
65
- return _qstash_client
66
-
67
- def is_qstash_available() -> bool:
68
- """Check if QStash is configured and ready"""
69
- return _qstash_client is not None
 
1
  # app/qstash_client.py
 
2
  import logging
3
+ from typing import Optional, Dict, Any
4
+ from app.deps import get_qstash_client # βœ… Import from existing logic
5
 
6
  logger = logging.getLogger(__name__)
7
 
8
+ def is_qstash_available() -> bool:
 
 
 
9
  """
10
+ Check if QStash is available without raising errors.
11
+ Uses the singleton from deps.py
 
 
 
12
  """
 
 
 
 
 
 
 
 
 
 
 
13
  try:
14
+ get_qstash_client()
 
 
 
 
 
 
 
 
 
15
  return True
16
+ except RuntimeError:
 
 
 
 
 
17
  return False
18
 
19
+ def publish_message(url: str, body: Dict[str, Any], callback: Optional[str] = None) -> Dict[str, Any]:
20
  """
21
+ Publish a message to QStash using the singleton client from deps.
22
+
23
+ Args:
24
+ url: Endpoint URL to call
25
+ body: JSON payload
26
+ callback: Optional callback URL
27
 
28
  Returns:
29
+ Dict with message_id
30
 
31
  Raises:
32
+ RuntimeError: If QStash not initialized
33
  """
34
+ client = get_qstash_client()
35
+ result = client.message.publish(url=url, body=body, callback=callback)
36
+
37
+ return {"message_id": result.message_id}
 
 
 
 
 
 
app/routers/analytics_stream.py CHANGED
@@ -4,7 +4,8 @@ from typing import List, Dict
4
  import json
5
  import asyncio
6
  from datetime import datetime
7
- from app.deps import get_current_user
 
8
  from app.redis_client import redis
9
  import uuid
10
  from app.qstash_client import publish_message, is_qstash_available
@@ -85,6 +86,9 @@ async def get_recent_analytics(
85
  "timestamp": datetime.utcnow().isoformat()
86
  }
87
 
 
 
 
88
  @router.post("/trigger")
89
  async def trigger_kpi_computation(
90
  source_id: str = Query(...),
@@ -99,21 +103,25 @@ async def trigger_kpi_computation(
99
  detail="QStash not configured. Check HF secrets."
100
  )
101
 
102
- # Check cache (your existing logic)
103
  cached = redis.get(f"kpi_cache:{org_id}:{source_id}")
104
  if cached:
105
  return {"status": "cached", "data": json.loads(cached)}
106
 
 
 
 
 
107
  # Publish to QStash
108
  try:
109
  result = publish_message(
110
- url=f"{settings.APP_URL}/api/v1/analytics/callback",
111
  body={
112
  "org_id": org_id,
113
  "source_id": source_id,
114
  "user_id": current_user["user_id"]
115
  },
116
- callback=f"{settings.APP_URL}/api/v1/analytics/notify"
117
  )
118
 
119
  return {
@@ -123,8 +131,8 @@ async def trigger_kpi_computation(
123
  }
124
 
125
  except Exception as e:
126
- raise HTTPException(status_code=500, detail=f"QStash error: {str(e)}")
127
-
128
  # app/routers/analytics_stream.py
129
  from fastapi import BackgroundTasks, Body, Depends # βœ… Add imports
130
 
 
4
  import json
5
  import asyncio
6
  from datetime import datetime
7
+ from app.deps import get_current_user,APP_URL
8
+
9
  from app.redis_client import redis
10
  import uuid
11
  from app.qstash_client import publish_message, is_qstash_available
 
86
  "timestamp": datetime.utcnow().isoformat()
87
  }
88
 
89
+ # Add import at top
90
+ from app.deps import APP_URL
91
+
92
  @router.post("/trigger")
93
  async def trigger_kpi_computation(
94
  source_id: str = Query(...),
 
103
  detail="QStash not configured. Check HF secrets."
104
  )
105
 
106
+ # Check cache
107
  cached = redis.get(f"kpi_cache:{org_id}:{source_id}")
108
  if cached:
109
  return {"status": "cached", "data": json.loads(cached)}
110
 
111
+ # βœ… Use APP_URL from deps
112
+ callback_url = f"{APP_URL}/api/v1/analytics/callback"
113
+ notify_url = f"{APP_URL}/api/v1/analytics/notify"
114
+
115
  # Publish to QStash
116
  try:
117
  result = publish_message(
118
+ url=callback_url,
119
  body={
120
  "org_id": org_id,
121
  "source_id": source_id,
122
  "user_id": current_user["user_id"]
123
  },
124
+ callback=notify_url
125
  )
126
 
127
  return {
 
131
  }
132
 
133
  except Exception as e:
134
+ logger.error(f"QStash publish failed: {e}")
135
+ raise HTTPException(status_code=500, detail="Failed to queue analytics")
136
  # app/routers/analytics_stream.py
137
  from fastapi import BackgroundTasks, Body, Depends # βœ… Add imports
138