shaliz-kong commited on
Commit
c43b642
Β·
1 Parent(s): 3b5e1d5

fixed reddis hammering

Browse files
Files changed (2) hide show
  1. app/core/worker_manager.py +111 -100
  2. app/main.py +1 -1
app/core/worker_manager.py CHANGED
@@ -1,129 +1,140 @@
1
- # app/core/worker_manager.py
2
 
3
  import asyncio
4
  import json
5
- from typing import Dict
6
- from app.tasks.analytics_worker import AnalyticsWorker
7
- from app.redis_client import redis
8
  import logging
9
 
 
 
 
10
  logger = logging.getLogger(__name__)
11
 
12
 
13
  class WorkerManager:
14
  def __init__(self):
15
- self.active_workers: Dict[str, AnalyticsWorker] = {}
16
- self.lock_ttl = 300
17
- self.stream_key = "stream:analytics_triggers"
18
- self.last_id = "0"
19
-
20
- # Adaptive polling intervals
21
- self.poll_interval = 1.0 # Active: 1s when messages are flowing
22
- self.idle_poll_interval = 60 # Idle: 60s when no work
23
- self.idle_threshold = 5 # 5 empty polls = idle mode
24
 
25
- # Track idle state
26
- self.consecutive_empty_polls = 0
 
 
 
27
 
28
  async def start_listener(self):
29
- """🎧 Stream-based listener: block on Redis stream reads to avoid polling
30
-
31
- NOTE: The original adaptive polling loop is left commented below for
32
- reference. This implementation uses blocking XREAD (if supported) so
33
- workers are only spawned when a readiness message is available.
34
  """
35
- logger.info(f"[manager] 🎧 stream listener (blocking read) on {self.stream_key}")
36
-
37
- # --- Original polling loop (commented) ---
38
- # (kept for reference during refactor)
39
- # logger.info(
40
- # f"[manager] 🎧 listening (active: {self.poll_interval}s, idle: {self.idle_poll_interval}s)"
41
- # )
42
- # while True:
43
- # ...
44
-
45
- # New: blocking xread loop
46
- while True:
47
- try:
48
- try:
49
- # Block for up to 5s waiting for new messages; fall back
50
- # to non-blocking if 'block' not supported by client
51
- result = redis.xread({self.stream_key: self.last_id}, count=10, block=5000)
52
- except TypeError:
53
- # Client may not accept block kwarg; try without block
54
- result = redis.xread({self.stream_key: self.last_id}, count=10)
55
-
56
- has_messages = bool(result and isinstance(result, dict) and result.get(self.stream_key))
57
-
58
- if not has_messages:
59
- # No messages within block window; continue (loop will block again)
60
- continue
61
-
62
- # We have messages β€” process them
63
- self.consecutive_empty_polls = 0
64
- messages = result[self.stream_key]
65
- logger.info(f"[manager] πŸ“₯ received {len(messages)} messages")
66
-
67
- for msg_id, msg_data in messages:
68
  try:
69
- payload = json.loads(msg_data.get("message", "{}"))
70
- org_id = payload.get("org_id")
71
- source_id = payload.get("source_id")
72
-
73
- if org_id and source_id:
74
- logger.info(f"[manager] πŸš€ processing {org_id}:{source_id}")
75
- await self.spawn_worker(org_id, source_id)
76
- self.last_id = msg_id
77
- else:
78
- logger.warning(f"[manager] ⚠️ missing IDs: {payload}")
79
-
80
- except json.JSONDecodeError as e:
81
- logger.error(f"[manager] ❌ JSON error: {e}")
82
  except Exception as e:
83
- logger.error(f"[manager] ❌ message processing error: {e}", exc_info=True)
84
-
85
- except Exception as e:
86
- logger.error(f"[manager] ❌ streaming error: {e}", exc_info=True)
87
- await asyncio.sleep(2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
89
- async def spawn_worker(self, org_id: str, source_id: str):
90
- """Spawn worker with distributed lock"""
91
- worker_key = f"{org_id}:{source_id}"
92
- lock_key = f"worker_lock:{worker_key}"
93
-
94
  try:
95
- # Check if worker is already running
96
- if redis.exists(lock_key):
97
- logger.debug(f"[manager] ⏭️ worker locked: {worker_key}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
  return
99
 
100
- # Set lock
101
- redis.setex(lock_key, self.lock_ttl, "1")
102
- logger.info(f"[manager] πŸ”’ lock acquired: {lock_key}")
103
 
104
- # Create and track worker
105
- worker = AnalyticsWorker(org_id, source_id)
106
- self.active_workers[worker_key] = worker
 
107
 
108
- # Run in background
109
- asyncio.create_task(self._run_worker(worker, worker_key, lock_key))
 
 
 
 
 
110
 
111
  except Exception as e:
112
- logger.error(f"[manager] ❌ spawn failed: {e}", exc_info=True)
113
- redis.delete(lock_key)
114
 
115
- async def _run_worker(self, worker: AnalyticsWorker, worker_key: str, lock_key: str):
116
- """Run worker and cleanup"""
117
  try:
118
- results = await worker.run()
119
- logger.info(f"[manager] βœ… worker complete: {worker_key}")
 
120
  except Exception as e:
121
- logger.error(f"[manager] ❌ worker failed: {worker_key} - {e}", exc_info=True)
122
  finally:
123
- # Cleanup worker and lock
124
- self.active_workers.pop(worker_key, None)
125
- redis.delete(lock_key)
126
- logger.debug(f"[manager] 🧹 cleaned up: {worker_key}")
127
-
128
-
 
 
 
 
 
 
 
 
129
  worker_manager = WorkerManager()
 
1
+ # app/core/worker_manager.py – TRUE ASYNC VERSION
2
 
3
  import asyncio
4
  import json
5
+ import time
6
+ from typing import Dict, Optional
 
7
  import logging
8
 
9
+ from app.core.event_hub import event_hub
10
+ from app.tasks.analytics_worker import trigger_kpi_computation
11
+
12
  logger = logging.getLogger(__name__)
13
 
14
 
15
  class WorkerManager:
16
  def __init__(self):
17
+ self.active_workers: Dict[str, asyncio.Task] = {}
18
+ self._shutdown = False
 
 
 
 
 
 
 
19
 
20
+ # Smart polling intervals
21
+ self.stream_check_interval = 5.0 # Check stream every 5s max
22
+ self.cleanup_interval = 10.0 # Cleanup every 10s
23
+ self.last_stream_check = 0
24
+ self.last_cleanup = 0
25
 
26
  async def start_listener(self):
 
 
 
 
 
27
  """
28
+ 🎧 TRUE async listener: pubsub for triggers + throttled stream checks
29
+ Redis ops: ~0.2/sec idle, ~1-2/sec under load
30
+ """
31
+ logger.info("🎧 Worker Manager: Einstein+Elon mode ENGAGED (true async)")
32
+
33
+ # Use pubsub for immediate triggers (ZERO polling)
34
+ pubsub = event_hub.redis.pubsub()
35
+ await pubsub.subscribe("analytics_trigger")
36
+
37
+ try:
38
+ while not self._shutdown:
39
+ # 1. Non-blocking pubsub check (true push - zero Redis ops when idle)
40
+ message = await pubsub.get_message(timeout=0.1)
41
+
42
+ if message and message["type"] == "message":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  try:
44
+ payload = json.loads(message["data"])
45
+ await self._handle_trigger(payload)
 
 
 
 
 
 
 
 
 
 
 
46
  except Exception as e:
47
+ logger.error(f"[MANAGER] ❌ Trigger parse error: {e}")
48
+
49
+ # 2. Throttled stream check (fallback safety net)
50
+ now = time.time()
51
+ if now - self.last_stream_check > self.stream_check_interval:
52
+ await self._check_stream()
53
+ self.last_stream_check = now
54
+
55
+ # 3. Throttled cleanup
56
+ if now - self.last_cleanup > self.cleanup_interval:
57
+ self._cleanup_completed_workers()
58
+ self.last_cleanup = now
59
+
60
+ # 4. Yield control (prevent CPU spinning)
61
+ await asyncio.sleep(0.01)
62
+
63
+ except asyncio.CancelledError:
64
+ logger.info("[MANAGER] πŸ›‘ Listener cancelled")
65
+ finally:
66
+ await pubsub.close()
67
+ logger.info("[MANAGER] πŸ”Œ Pubsub closed")
68
 
69
+ async def _check_stream(self):
70
+ """Lightweight stream check - only reads new messages"""
 
 
 
71
  try:
72
+ # Non-blocking read of recent messages only
73
+ result = event_hub.redis.xrevrange(
74
+ "stream:analytics_triggers",
75
+ count=5 # Only check last 5
76
+ )
77
+
78
+ if result:
79
+ logger.debug(f"[MANAGER] πŸ“₯ Stream check found {len(result)} messages")
80
+ for msg_id, data in result:
81
+ payload = json.loads(data.get("message", "{}"))
82
+ await self._handle_trigger(payload)
83
+ # Acknowledge processed message
84
+ event_hub.redis.xdel("stream:analytics_triggers", msg_id)
85
+
86
+ except Exception as e:
87
+ logger.debug(f"[MANAGER] Stream check failed: {e}")
88
+
89
+ async def _handle_trigger(self, data: dict):
90
+ """Process trigger with rate limiting"""
91
+ try:
92
+ org_id = data.get("org_id")
93
+ source_id = data.get("source_id")
94
+
95
+ if not org_id or not source_id:
96
+ logger.warning(f"[MANAGER] ⚠️ Invalid trigger: {data}")
97
  return
98
 
99
+ worker_id = f"{org_id}:{source_id}"
 
 
100
 
101
+ # Skip if already running
102
+ if worker_id in self.active_workers and not self.active_workers[worker_id].done():
103
+ logger.debug(f"[MANAGER] ⏭️ Worker already active: {worker_id}")
104
+ return
105
 
106
+ # Spawn worker
107
+ logger.info(f"[MANAGER] πŸš€ Spawning worker: {worker_id}")
108
+ task = asyncio.create_task(
109
+ self._run_worker(worker_id, org_id, source_id),
110
+ name=f"worker-{worker_id}"
111
+ )
112
+ self.active_workers[worker_id] = task
113
 
114
  except Exception as e:
115
+ logger.error(f"[MANAGER] ❌ Trigger handling failed: {e}", exc_info=True)
 
116
 
117
+ async def _run_worker(self, worker_id: str, org_id: str, source_id: str):
118
+ """Run the actual KPI computation"""
119
  try:
120
+ # Use the existing trigger function (handles locks, caching, etc)
121
+ await trigger_kpi_computation(org_id, source_id)
122
+ logger.info(f"[MANAGER] βœ… Worker complete: {worker_id}")
123
  except Exception as e:
124
+ logger.error(f"[MANAGER] ❌ Worker failed: {worker_id} - {e}", exc_info=True)
125
  finally:
126
+ self.active_workers.pop(worker_id, None)
127
+
128
+ def _cleanup_completed_workers(self):
129
+ """Remove completed tasks from registry"""
130
+ done_workers = [
131
+ wid for wid, task in self.active_workers.items()
132
+ if task.done()
133
+ ]
134
+ for wid in done_workers:
135
+ self.active_workers.pop(wid, None)
136
+ logger.debug(f"[MANAGER] 🧹 Cleaned up: {wid}")
137
+
138
+
139
+ # Global instance
140
  worker_manager = WorkerManager()
app/main.py CHANGED
@@ -14,7 +14,7 @@ import pathlib
14
  import json
15
 
16
  # # ─── Third-Party ──────────────────────────────────────────────────────────────
17
- # from fastapi import FastAPI, Depends, HTTPException, Request, Query, BackgroundTasks
18
  from fastapi.middleware.cors import CORSMiddleware
19
  from fastapi.responses import JSONResponse
20
  from contextlib import asynccontextmanager
 
14
  import json
15
 
16
  # # ─── Third-Party ──────────────────────────────────────────────────────────────
17
+ from fastapi import FastAPI, Depends, HTTPException, Request, Query, BackgroundTasks
18
  from fastapi.middleware.cors import CORSMiddleware
19
  from fastapi.responses import JSONResponse
20
  from contextlib import asynccontextmanager