shaliz-kong commited on
Commit
e29d898
Β·
1 Parent(s): 9a1b67d

coreected worker manager

Browse files
Files changed (1) hide show
  1. app/core/worker_manager.py +145 -21
app/core/worker_manager.py CHANGED
@@ -1,31 +1,44 @@
1
- # app/core/worker_manager.py – UPSTASH-FREE-TIER-COMPATIBLE
2
- import os
3
  import asyncio
4
  import json
 
5
  import time
6
- from typing import Dict
7
  import logging
8
-
9
  from app.core.event_hub import event_hub
10
- from app.tasks.analytics_worker import trigger_kpi_computation
11
 
12
  logger = logging.getLogger(__name__)
13
 
14
 
 
 
 
 
 
 
 
15
  class WorkerManager:
 
 
 
 
 
16
  def __init__(self):
17
  self.active_workers: Dict[str, asyncio.Task] = {}
18
  self._shutdown = False
19
 
20
  # ⚑ ADAPTIVE POLLING (configurable via env vars)
21
- self.active_interval = float(os.getenv("WORKER_POLL_ACTIVE", "1.0")) # 1s when busy
22
- self.idle_interval = float(os.getenv("WORKER_POLL_IDLE", "30.0")) # 30s when idle
23
  self.consecutive_empty = 0
24
 
25
  async def start_listener(self):
26
  """
27
  🎧 UPSTASH-SAFE: No pubsub, no blocking xread, just smart async polling
28
- Redis ops: ~0.03/sec idle, ~2/sec under load (well within free tier)
29
  """
30
  logger.info(
31
  f"🎧 Worker Manager: Einstein+Elon mode ENGAGED "
@@ -47,7 +60,7 @@ class WorkerManager:
47
 
48
  # Log state changes
49
  if self.consecutive_empty == 5:
50
- logger.info(f"[MANAGER] πŸ›Œ Idle mode activated (poll: {interval}s)")
51
 
52
  await asyncio.sleep(interval)
53
 
@@ -56,25 +69,43 @@ class WorkerManager:
56
  break
57
  except Exception as e:
58
  logger.error(f"[MANAGER] ❌ Error: {e}", exc_info=True)
59
- await asyncio.sleep(5) # Back off on errors
60
 
61
- async def _fetch_pending_triggers(self) -> list:
62
  """
63
  Fetch pending triggers in a SINGLE Redis call
64
  Uses xrevrange to get newest messages without blocking
 
65
  """
66
  try:
67
- # Get last 10 messages from stream (non-blocking, minimal ops)
68
  result = event_hub.redis.xrevrange(
69
  "stream:analytics_triggers",
70
  count=10
71
  )
72
 
73
- # If using Upstash HTTP client, result might be dict
 
74
  if isinstance(result, dict):
75
- messages = list(result.items()) if result else []
76
- else:
77
- messages = result or []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
  return messages
80
 
@@ -82,13 +113,30 @@ class WorkerManager:
82
  logger.debug(f"[MANAGER] Fetch failed: {e}")
83
  return []
84
 
85
- async def _process_batch(self, messages: list):
86
  """Process multiple triggers efficiently"""
87
  logger.info(f"[MANAGER] πŸ“₯ Processing {len(messages)} triggers")
88
 
89
  for msg_id, msg_data in messages:
90
  try:
91
- payload = json.loads(msg_data.get("message", "{}"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  await self._handle_trigger(payload)
93
 
94
  # Acknowledge: delete processed message
@@ -124,7 +172,8 @@ class WorkerManager:
124
  async def _run_worker(self, worker_id: str, org_id: str, source_id: str):
125
  """Execute KPI computation with automatic cleanup"""
126
  try:
127
- await trigger_kpi_computation(org_id, source_id)
 
128
  logger.info(f"[MANAGER] βœ… Complete: {worker_id}")
129
  except Exception as e:
130
  logger.error(f"[MANAGER] ❌ Failed: {worker_id} - {e}", exc_info=True)
@@ -146,5 +195,80 @@ class WorkerManager:
146
  logger.info("[MANAGER] πŸ›‘ Shutdown initiated")
147
 
148
 
149
- # Global instance
150
- worker_manager = WorkerManager()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/core/worker_manager.py – UPSTASH-COMPATIBLE v4.1
2
+
3
  import asyncio
4
  import json
5
+ import os
6
  import time
7
+ from typing import Dict, List, Optional, Any
8
  import logging
9
+ import datetime
10
  from app.core.event_hub import event_hub
11
+ from app.tasks.analytics_worker import AnalyticsWorker
12
 
13
  logger = logging.getLogger(__name__)
14
 
15
 
16
+ def _safe_redis_decode(value: Any) -> str:
17
+ """Safely decode Redis values that might be bytes or str"""
18
+ if isinstance(value, bytes):
19
+ return value.decode('utf-8')
20
+ return str(value)
21
+
22
+
23
  class WorkerManager:
24
+ """
25
+ πŸŽ›οΈ Manages worker lifecycle and prevents Redis hammering
26
+ Uses ONLY Upstash-safe HTTP commands: GET, SET, EXISTS, DEL, XREVRANGE
27
+ """
28
+
29
  def __init__(self):
30
  self.active_workers: Dict[str, asyncio.Task] = {}
31
  self._shutdown = False
32
 
33
  # ⚑ ADAPTIVE POLLING (configurable via env vars)
34
+ self.active_interval = float(os.getenv("WORKER_POLL_ACTIVE", "1.0"))
35
+ self.idle_interval = float(os.getenv("WORKER_POLL_IDLE", "30.0"))
36
  self.consecutive_empty = 0
37
 
38
  async def start_listener(self):
39
  """
40
  🎧 UPSTASH-SAFE: No pubsub, no blocking xread, just smart async polling
41
+ Redis ops: ~0.03/sec idle, ~2/sec under load
42
  """
43
  logger.info(
44
  f"🎧 Worker Manager: Einstein+Elon mode ENGAGED "
 
60
 
61
  # Log state changes
62
  if self.consecutive_empty == 5:
63
+ logger.info(f"[MANAGER] πŸ›Œ Idle mode activated (poll: {interval:.1f}s)")
64
 
65
  await asyncio.sleep(interval)
66
 
 
69
  break
70
  except Exception as e:
71
  logger.error(f"[MANAGER] ❌ Error: {e}", exc_info=True)
72
+ await asyncio.sleep(5)
73
 
74
+ async def _fetch_pending_triggers(self) -> List[tuple]:
75
  """
76
  Fetch pending triggers in a SINGLE Redis call
77
  Uses xrevrange to get newest messages without blocking
78
+ Returns: [(msg_id, {field: value}), ...]
79
  """
80
  try:
81
+ # Get last 10 messages from stream (non-blocking)
82
  result = event_hub.redis.xrevrange(
83
  "stream:analytics_triggers",
84
  count=10
85
  )
86
 
87
+ # Handle different response formats from Upstash
88
+ messages = []
89
  if isinstance(result, dict):
90
+ # Format: {msg_id: {field: value}, ...}
91
+ for msg_id, data in result.items():
92
+ messages.append((msg_id, data))
93
+ elif isinstance(result, list):
94
+ # Format: [(msg_id, [field, value, field, value]), ...]
95
+ for item in result:
96
+ if isinstance(item, (list, tuple)) and len(item) == 2:
97
+ msg_id, data = item
98
+ # Convert flat list to dict if needed
99
+ if isinstance(data, list):
100
+ data_dict = {}
101
+ for i in range(0, len(data), 2):
102
+ if i + 1 < len(data):
103
+ key = _safe_redis_decode(data[i])
104
+ value = _safe_redis_decode(data[i + 1])
105
+ data_dict[key] = value
106
+ messages.append((msg_id, data_dict))
107
+ else:
108
+ messages.append((msg_id, data))
109
 
110
  return messages
111
 
 
113
  logger.debug(f"[MANAGER] Fetch failed: {e}")
114
  return []
115
 
116
+ async def _process_batch(self, messages: List[tuple]):
117
  """Process multiple triggers efficiently"""
118
  logger.info(f"[MANAGER] πŸ“₯ Processing {len(messages)} triggers")
119
 
120
  for msg_id, msg_data in messages:
121
  try:
122
+ # Handle different data formats
123
+ if isinstance(msg_data, dict):
124
+ # Already a dict
125
+ message_str = msg_data.get("message", "{}")
126
+ elif isinstance(msg_data, list):
127
+ # Flat list: [field, value, field, value]
128
+ message_str = "{}"
129
+ for i in range(0, len(msg_data), 2):
130
+ if i + 1 < len(msg_data):
131
+ key = _safe_redis_decode(msg_data[i])
132
+ if key == "message":
133
+ message_str = _safe_redis_decode(msg_data[i + 1])
134
+ break
135
+ else:
136
+ logger.warning(f"[MANAGER] Unknown msg_data format: {type(msg_data)}")
137
+ continue
138
+
139
+ payload = json.loads(message_str)
140
  await self._handle_trigger(payload)
141
 
142
  # Acknowledge: delete processed message
 
172
  async def _run_worker(self, worker_id: str, org_id: str, source_id: str):
173
  """Execute KPI computation with automatic cleanup"""
174
  try:
175
+ worker = AnalyticsWorker(org_id, source_id)
176
+ await worker.run()
177
  logger.info(f"[MANAGER] βœ… Complete: {worker_id}")
178
  except Exception as e:
179
  logger.error(f"[MANAGER] ❌ Failed: {worker_id} - {e}", exc_info=True)
 
195
  logger.info("[MANAGER] πŸ›‘ Shutdown initiated")
196
 
197
 
198
+ # ==================== FASTAPI INTEGRATION ====================
199
+
200
+ # Global manager instance
201
+ _worker_manager: Optional[WorkerManager] = None
202
+
203
+
204
+ async def get_worker_manager() -> WorkerManager:
205
+ """Get or create worker manager singleton"""
206
+ global _worker_manager
207
+ if _worker_manager is None:
208
+ _worker_manager = WorkerManager()
209
+ return _worker_manager
210
+
211
+
212
+ async def trigger_kpi_computation(org_id: str, source_id: str):
213
+ """
214
+ 🎯 FastAPI endpoint handler - triggers worker via Redis stream
215
+ Idempotent: multiple calls won't spawn duplicate workers
216
+ """
217
+ try:
218
+ # Write to stream (HTTP-safe)
219
+ event_hub.redis.xadd(
220
+ "stream:analytics_triggers",
221
+ {
222
+ "message": json.dumps({
223
+ "org_id": org_id,
224
+ "source_id": source_id,
225
+ "type": "kpi_compute",
226
+ "timestamp": datetime.now().isoformat()
227
+ })
228
+ }
229
+ )
230
+ logger.info(f"🎯 Triggered KPI computation: {org_id}/{source_id}")
231
+ return {"status": "triggered", "org_id": org_id, "source_id": source_id}
232
+
233
+ except Exception as e:
234
+ logger.error(f"Trigger failed: {e}", exc_info=True)
235
+ return {"status": "error", "message": str(e)}
236
+
237
+
238
+ # ==================== BACKGROUND REFRESH (Optional) ====================
239
+
240
+ async def continuous_kpi_refresh(manager: WorkerManager):
241
+ """
242
+ πŸŽ›οΈ Gentle background refresh - runs every 5 minutes
243
+ Only triggers for stale data (no active worker, no fresh cache)
244
+ """
245
+ await asyncio.sleep(10) # Let app startup complete
246
+
247
+ while True:
248
+ try:
249
+ # Get all entity keys (HTTP-safe)
250
+ entity_keys = event_hub.redis.keys("entity:*:*")
251
+
252
+ for key in entity_keys[:10]: # Max 10 per cycle
253
+ key_str = key.decode() if isinstance(key, bytes) else key
254
+ _, org_id, source_id = key_str.split(":")
255
+
256
+ worker_id = f"{org_id}:{source_id}"
257
+
258
+ # Skip if worker already running
259
+ if worker_id in manager.active_workers:
260
+ continue
261
+
262
+ # Skip if KPIs are fresh (< 5 min old)
263
+ cache_key = f"kpi_cache:{org_id}:{source_id}"
264
+ if event_hub.redis.exists(cache_key):
265
+ continue
266
+
267
+ # Trigger refresh
268
+ await trigger_kpi_computation(org_id, source_id)
269
+ await asyncio.sleep(1) # 1s gap
270
+
271
+ except Exception as e:
272
+ logger.error(f"[AUTO] Error: {e}", exc_info=True)
273
+
274
+ await asyncio.sleep(300) # ⭐ Sleep 5 minutes