Peter Mutwiri commited on
Commit
ce0938c
Β·
1 Parent(s): 9d87f4c

refactore worker manager

Browse files
Files changed (3) hide show
  1. app/core/event_hub.py +18 -18
  2. app/core/worker_manager.py +54 -20
  3. app/main.py +39 -44
app/core/event_hub.py CHANGED
@@ -73,25 +73,25 @@ class EventHub:
73
 
74
  # app/core/event_hub.py
75
 
76
- def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
77
- """Write trigger to single stream (worker manager polls this)"""
78
- stream_key = "stream:analytics_triggers"
79
 
80
- payload = {
81
- "org_id": org_id,
82
- "source_id": source_id,
83
- "timestamp": datetime.utcnow().isoformat(),
84
- }
85
- if extra:
86
- payload.update(extra)
87
-
88
- try:
89
- msg_id = self.redis.xadd(stream_key, {"message": json.dumps(payload)})
90
- logger.info(f"[hub] πŸ“€ emitted trigger to {stream_key} (id: {msg_id})")
91
- return msg_id
92
- except Exception as e:
93
- logger.error(f"[hub] ❌ failed to emit trigger: {e}", exc_info=True)
94
- return None
95
 
96
  def ensure_consumer_group(self, stream_key: str, group: str):
97
  try:
 
73
 
74
  # app/core/event_hub.py
75
 
76
+ def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
77
+ """Write trigger to single stream (worker manager polls this)"""
78
+ stream_key = "stream:analytics_triggers"
79
 
80
+ payload = {
81
+ "org_id": org_id,
82
+ "source_id": source_id,
83
+ "timestamp": datetime.utcnow().isoformat(),
84
+ }
85
+ if extra:
86
+ payload.update(extra)
87
+
88
+ try:
89
+ msg_id = self.redis.xadd(stream_key, {"message": json.dumps(payload)})
90
+ logger.info(f"[hub] πŸ“€ emitted trigger to {stream_key} (id: {msg_id})")
91
+ return msg_id
92
+ except Exception as e:
93
+ logger.error(f"[hub] ❌ failed to emit trigger: {e}", exc_info=True)
94
+ return None
95
 
96
  def ensure_consumer_group(self, stream_key: str, group: str):
97
  try:
app/core/worker_manager.py CHANGED
@@ -4,31 +4,46 @@ import asyncio
4
  import json
5
  from typing import Dict
6
  from app.tasks.analytics_worker import AnalyticsWorker
7
- from app.redis_client import redis # Use your existing Upstash client
8
  import logging
9
 
10
  logger = logging.getLogger(__name__)
11
 
 
12
  class WorkerManager:
13
  def __init__(self):
14
  self.active_workers: Dict[str, AnalyticsWorker] = {}
15
  self.lock_ttl = 300
16
  self.stream_key = "stream:analytics_triggers"
17
  self.last_id = "0"
18
- self.poll_interval = 1.0 # Seconds between polls
 
 
 
 
 
 
 
19
 
20
  async def start_listener(self):
21
- """🎧 Poll Upstash HTTP API for trigger events"""
22
- logger.info(f"[manager] 🎧 listening via HTTP polling (interval: {self.poll_interval}s)")
 
 
23
 
24
  while True:
25
  try:
26
- # CRITICAL: Remove 'block' parameter - not supported by Upstash
27
  result = redis.xread({self.stream_key: self.last_id}, count=10)
28
 
29
- if result:
30
- # Parse messages
31
- messages = result.get(self.stream_key, [])
 
 
 
 
 
32
  logger.info(f"[manager] πŸ“₯ received {len(messages)} messages")
33
 
34
  for msg_id, msg_data in messages:
@@ -40,22 +55,40 @@ class WorkerManager:
40
  if org_id and source_id:
41
  logger.info(f"[manager] πŸš€ processing {org_id}:{source_id}")
42
  await self.spawn_worker(org_id, source_id)
43
- self.last_id = msg_id # Update resume point
44
  else:
45
- logger.warning(f"[manager] ⚠️ missing IDs in payload: {payload}")
 
46
  except json.JSONDecodeError as e:
47
  logger.error(f"[manager] ❌ JSON error: {e}")
 
 
48
 
49
- # Small delay after processing batch
50
  await asyncio.sleep(0.1)
 
 
 
 
 
 
 
 
 
 
 
51
  else:
52
- # CRITICAL: Sleep when no messages
53
- logger.debug("[manager] πŸ’€ no messages, sleeping")
54
- await asyncio.sleep(self.poll_interval)
55
-
 
 
 
56
  except Exception as e:
57
  logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
58
  await asyncio.sleep(5) # Back off on error
 
59
 
60
  async def spawn_worker(self, org_id: str, source_id: str):
61
  """Spawn worker with distributed lock"""
@@ -63,7 +96,7 @@ class WorkerManager:
63
  lock_key = f"worker_lock:{worker_key}"
64
 
65
  try:
66
- # Check existing lock
67
  if redis.exists(lock_key):
68
  logger.debug(f"[manager] ⏭️ worker locked: {worker_key}")
69
  return
@@ -72,7 +105,7 @@ class WorkerManager:
72
  redis.setex(lock_key, self.lock_ttl, "1")
73
  logger.info(f"[manager] πŸ”’ lock acquired: {lock_key}")
74
 
75
- # Create worker
76
  worker = AnalyticsWorker(org_id, source_id)
77
  self.active_workers[worker_key] = worker
78
 
@@ -84,16 +117,17 @@ class WorkerManager:
84
  redis.delete(lock_key)
85
 
86
  async def _run_worker(self, worker: AnalyticsWorker, worker_key: str, lock_key: str):
87
- """Execute worker and cleanup"""
88
  try:
89
  results = await worker.run()
90
  logger.info(f"[manager] βœ… worker complete: {worker_key}")
91
  except Exception as e:
92
  logger.error(f"[manager] ❌ worker failed: {worker_key} - {e}", exc_info=True)
93
  finally:
94
- # Cleanup
95
  self.active_workers.pop(worker_key, None)
96
  redis.delete(lock_key)
97
- logger.info(f"[manager] 🧹 cleaned up: {worker_key}")
 
98
 
99
  worker_manager = WorkerManager()
 
4
  import json
5
  from typing import Dict
6
  from app.tasks.analytics_worker import AnalyticsWorker
7
+ from app.redis_client import redis
8
  import logging
9
 
10
  logger = logging.getLogger(__name__)
11
 
12
+
13
  class WorkerManager:
14
  def __init__(self):
15
  self.active_workers: Dict[str, AnalyticsWorker] = {}
16
  self.lock_ttl = 300
17
  self.stream_key = "stream:analytics_triggers"
18
  self.last_id = "0"
19
+
20
+ # Adaptive polling intervals
21
+ self.poll_interval = 1.0 # Active: 1s when messages are flowing
22
+ self.idle_poll_interval = 60 # Idle: 60s when no work
23
+ self.idle_threshold = 5 # 5 empty polls = idle mode
24
+
25
+ # Track idle state
26
+ self.consecutive_empty_polls = 0
27
 
28
  async def start_listener(self):
29
+ """🎧 Adaptive polling: fast when busy, slow when idle"""
30
+ logger.info(
31
+ f"[manager] 🎧 listening (active: {self.poll_interval}s, idle: {self.idle_poll_interval}s)"
32
+ )
33
 
34
  while True:
35
  try:
36
+ # Poll Redis stream (non-blocking)
37
  result = redis.xread({self.stream_key: self.last_id}, count=10)
38
 
39
+ # Check if we got messages
40
+ has_messages = bool(result and isinstance(result, dict) and result.get(self.stream_key))
41
+
42
+ if has_messages:
43
+ # Reset idle counter
44
+ self.consecutive_empty_polls = 0
45
+
46
+ messages = result[self.stream_key]
47
  logger.info(f"[manager] πŸ“₯ received {len(messages)} messages")
48
 
49
  for msg_id, msg_data in messages:
 
55
  if org_id and source_id:
56
  logger.info(f"[manager] πŸš€ processing {org_id}:{source_id}")
57
  await self.spawn_worker(org_id, source_id)
58
+ self.last_id = msg_id
59
  else:
60
+ logger.warning(f"[manager] ⚠️ missing IDs: {payload}")
61
+
62
  except json.JSONDecodeError as e:
63
  logger.error(f"[manager] ❌ JSON error: {e}")
64
+ except Exception as e:
65
+ logger.error(f"[manager] ❌ message processing error: {e}", exc_info=True)
66
 
67
+ # Brief pause after batch, then continue polling fast
68
  await asyncio.sleep(0.1)
69
+ continue # Skip to next iteration (stay in active mode)
70
+
71
+ # No messages found
72
+ self.consecutive_empty_polls += 1
73
+
74
+ # Choose sleep duration based on idle state
75
+ if self.consecutive_empty_polls > self.idle_threshold:
76
+ delay = self.idle_poll_interval
77
+ logger.debug(
78
+ f"[manager] πŸ’€ idle mode, sleeping {delay}s (empty polls: {self.consecutive_empty_polls})"
79
+ )
80
  else:
81
+ delay = self.poll_interval
82
+ logger.debug(
83
+ f"[manager] πŸ’€ active mode, sleeping {delay}s (empty polls: {self.consecutive_empty_polls})"
84
+ )
85
+
86
+ await asyncio.sleep(delay)
87
+
88
  except Exception as e:
89
  logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
90
  await asyncio.sleep(5) # Back off on error
91
+ self.consecutive_empty_polls = 0 # Reset after error
92
 
93
  async def spawn_worker(self, org_id: str, source_id: str):
94
  """Spawn worker with distributed lock"""
 
96
  lock_key = f"worker_lock:{worker_key}"
97
 
98
  try:
99
+ # Check if worker is already running
100
  if redis.exists(lock_key):
101
  logger.debug(f"[manager] ⏭️ worker locked: {worker_key}")
102
  return
 
105
  redis.setex(lock_key, self.lock_ttl, "1")
106
  logger.info(f"[manager] πŸ”’ lock acquired: {lock_key}")
107
 
108
+ # Create and track worker
109
  worker = AnalyticsWorker(org_id, source_id)
110
  self.active_workers[worker_key] = worker
111
 
 
117
  redis.delete(lock_key)
118
 
119
  async def _run_worker(self, worker: AnalyticsWorker, worker_key: str, lock_key: str):
120
+ """Run worker and cleanup"""
121
  try:
122
  results = await worker.run()
123
  logger.info(f"[manager] βœ… worker complete: {worker_key}")
124
  except Exception as e:
125
  logger.error(f"[manager] ❌ worker failed: {worker_key} - {e}", exc_info=True)
126
  finally:
127
+ # Cleanup worker and lock
128
  self.active_workers.pop(worker_key, None)
129
  redis.delete(lock_key)
130
+ logger.debug(f"[manager] 🧹 cleaned up: {worker_key}")
131
+
132
 
133
  worker_manager = WorkerManager()
app/main.py CHANGED
@@ -3,7 +3,7 @@
3
  MutSyncHub Analytics Engine
4
  Enterprise-grade AI analytics platform with zero-cost inference
5
  # """
6
- # import logging
7
  import os
8
  import time
9
  import uuid
@@ -86,54 +86,49 @@ async def lifespan(app: FastAPI):
86
  except Exception as e:
87
  logger.error(f"πŸ”΄ Startup health check failed: {e}")
88
 
89
- # Start scheduler in background
90
- scheduler_process = subprocess.Popen(["python", "/app/scheduler_loop.py"])
91
- logger.info(f"βœ… Scheduler started (PID: {scheduler_process.pid})")
 
 
 
 
 
 
 
92
 
93
  logger.info("βœ… Startup sequence complete")
94
 
95
- # Setup Redis streams
96
- logger.info("πŸ”„ Setting up Redis streams...")
97
- try:
98
- active_orgs = event_hub.keys("entity:*")
99
- for key in active_orgs:
100
- key_parts = safe_redis_decode(key).split(":")
101
- if len(key_parts) >= 3:
102
- org_id, source_id = key_parts[1], key_parts[2]
103
- stream_key = f"stream:analytics:{org_id}:{source_id}"
104
- try:
105
- event_hub.ensure_consumer_group(stream_key, f"analytics_consumers_{org_id}")
106
- except Exception as e:
107
- if "BUSYGROUP" not in str(e):
108
- logger.warning(f"⚠️ Stream setup warning: {e}")
109
-
110
- logger.info("βœ… Redis streams consumer groups ready")
111
- except Exception as e:
112
- logger.error(f"❌ Stream setup failed: {e}")
113
 
114
- # βœ… start event hub (ensures consumer groups)
115
- logger.info("πŸ”Œ initializing event hub...")
116
- try:
117
- # Ensure a sample consumer group exists (no-op if already present)
118
- event_hub.ensure_consumer_group("stream:analytics:org_synth_123:default", "analytics_consumers")
119
- except Exception as e:
120
- logger.warning(f"Event hub init warning: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
- # βœ… start worker manager listener
123
- logger.info("πŸš€ starting worker manager...")
124
- asyncio.create_task(worker_manager.start_listener(), name="worker-manager")
125
- # Now load LLM service - it will use persistent cache
126
- try:
127
- load_llm_service() # Starts background loading
128
- logger.info("πŸ€– LLM service loading in background...")
129
- except Exception as e:
130
- logger.error(f"❌ LLM load failed: {e}")
131
- # Continue anyway - LLM is optional for some features
132
- try:
133
- get_qstash_client() # This creates the singleton if not exists
134
- logger.info("βœ… QStash ready")
135
- except RuntimeError as e:
136
- logger.warning(f"⚠️ QStash disabled: {e}")
137
  yield
138
 
139
  # ─── Shutdown ──────────────────────────────────────────────────────────────
 
3
  MutSyncHub Analytics Engine
4
  Enterprise-grade AI analytics platform with zero-cost inference
5
  # """
6
+ import logging
7
  import os
8
  import time
9
  import uuid
 
86
  except Exception as e:
87
  logger.error(f"πŸ”΄ Startup health check failed: {e}")
88
 
89
+ # Start scheduler in background (optional - controllable via env)
90
+ scheduler_process = None
91
+ if os.getenv("DISABLE_SCHEDULER") != "1":
92
+ try:
93
+ scheduler_process = subprocess.Popen(["python", "/app/scheduler_loop.py"])
94
+ logger.info(f"βœ… Scheduler started (PID: {scheduler_process.pid})")
95
+ except Exception as e:
96
+ logger.warning(f"⚠️ Scheduler failed to start: {e}")
97
+ else:
98
+ logger.info("ℹ️ Scheduler start skipped (DISABLE_SCHEDULER=1)")
99
 
100
  logger.info("βœ… Startup sequence complete")
101
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
+
104
+ # βœ… start worker manager listener (optional)
105
+ if os.getenv("DISABLE_WORKER_MANAGER") != "1":
106
+ logger.info("πŸš€ starting worker manager...")
107
+ try:
108
+ asyncio.create_task(worker_manager.start_listener(), name="worker-manager")
109
+ except Exception as e:
110
+ logger.error(f"❌ Failed to start worker manager: {e}")
111
+ else:
112
+ logger.info("ℹ️ Worker manager start skipped (DISABLE_WORKER_MANAGER=1)")
113
+ # Now load optional services (LLM, QStash)
114
+ if os.getenv("DISABLE_LLM_LOAD") != "1":
115
+ try:
116
+ load_llm_service() # Starts background loading
117
+ logger.info("πŸ€– LLM service loading in background...")
118
+ except Exception as e:
119
+ logger.error(f"❌ LLM load failed: {e}")
120
+ else:
121
+ logger.info("ℹ️ LLM loading skipped (DISABLE_LLM_LOAD=1)")
122
 
123
+ # QStash client is optional; guard behind env var
124
+ if os.getenv("DISABLE_QSTASH") != "1":
125
+ try:
126
+ get_qstash_client() # This creates the singleton if not exists
127
+ logger.info("βœ… QStash ready")
128
+ except RuntimeError as e:
129
+ logger.warning(f"⚠️ QStash disabled: {e}")
130
+ else:
131
+ logger.info("ℹ️ QStash initialization skipped (DISABLE_QSTASH=1)")
 
 
 
 
 
 
132
  yield
133
 
134
  # ─── Shutdown ──────────────────────────────────────────────────────────────