Peter Mutwiri commited on
Commit
9507988
Β·
1 Parent(s): f962ee5

cleaned worker manager

Browse files
Files changed (1) hide show
  1. app/core/worker_manager.py +28 -20
app/core/worker_manager.py CHANGED
@@ -2,70 +2,77 @@
2
 
3
  import asyncio
4
  import json
 
5
  from app.tasks.analytics_worker import AnalyticsWorker
6
- from app.redis_client import redis
7
  import logging
8
 
9
  logger = logging.getLogger(__name__)
10
 
11
  class WorkerManager:
12
  def __init__(self):
13
- self.active_workers: dict = {}
14
  self.lock_ttl = 300
15
- self.stream_key = "stream:analytics_triggers" # Single stream for all triggers
16
- self.last_id = "0" # Resume point
 
17
 
18
  async def start_listener(self):
19
  """🎧 Poll Upstash HTTP API for trigger events"""
20
- logger.info("[manager] 🎧 worker manager listening via HTTP polling")
21
 
22
  while True:
23
  try:
24
- # XREAD without 'block' - Upstash doesn't support it
25
  result = redis.xread({self.stream_key: self.last_id}, count=10)
26
 
27
  if result:
28
- # Upstash returns: {stream: [(msg_id, data), ...]}
29
  messages = result.get(self.stream_key, [])
 
30
 
31
  for msg_id, msg_data in messages:
32
  try:
33
- # Parse payload from message body (not stream name!)
34
  payload = json.loads(msg_data.get("message", "{}"))
35
  org_id = payload.get("org_id")
36
  source_id = payload.get("source_id")
37
 
38
  if org_id and source_id:
39
- logger.info(f"[manager] πŸ“₯ trigger: {org_id}:{source_id} (msg: {msg_id})")
40
  await self.spawn_worker(org_id, source_id)
41
  self.last_id = msg_id # Update resume point
42
  else:
43
- logger.warning(f"[manager] ⚠️ invalid payload: {msg_data}")
44
  except json.JSONDecodeError as e:
45
  logger.error(f"[manager] ❌ JSON error: {e}")
 
 
 
46
  else:
47
- await asyncio.sleep(1) # Polling interval
 
 
48
 
49
  except Exception as e:
50
  logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
51
- await asyncio.sleep(5)
52
 
53
  async def spawn_worker(self, org_id: str, source_id: str):
54
- """Spawn a worker with distributed locking"""
55
  worker_key = f"{org_id}:{source_id}"
56
  lock_key = f"worker_lock:{worker_key}"
57
 
58
  try:
59
- # Check if worker is already running
60
  if redis.exists(lock_key):
61
- logger.debug(f"[manager] ⏭️ worker already active: {worker_key}")
62
  return
63
 
64
- # Set lock with TTL
65
  redis.setex(lock_key, self.lock_ttl, "1")
66
- logger.info(f"[manager] πŸš€ spawning worker for {worker_key}")
67
 
68
- # Create and track worker
69
  worker = AnalyticsWorker(org_id, source_id)
70
  self.active_workers[worker_key] = worker
71
 
@@ -77,15 +84,16 @@ class WorkerManager:
77
  redis.delete(lock_key)
78
 
79
  async def _run_worker(self, worker: AnalyticsWorker, worker_key: str, lock_key: str):
80
- """Run worker and cleanup"""
81
  try:
82
  results = await worker.run()
83
- logger.info(f"[manager] βœ… worker completed: {worker_key}")
84
  except Exception as e:
85
  logger.error(f"[manager] ❌ worker failed: {worker_key} - {e}", exc_info=True)
86
  finally:
87
  # Cleanup
88
  self.active_workers.pop(worker_key, None)
89
  redis.delete(lock_key)
 
90
 
91
  worker_manager = WorkerManager()
 
2
 
3
  import asyncio
4
  import json
5
+ from typing import Dict
6
  from app.tasks.analytics_worker import AnalyticsWorker
7
+ from app.redis_client import redis # Use your existing Upstash client
8
  import logging
9
 
10
  logger = logging.getLogger(__name__)
11
 
12
  class WorkerManager:
13
  def __init__(self):
14
+ self.active_workers: Dict[str, AnalyticsWorker] = {}
15
  self.lock_ttl = 300
16
+ self.stream_key = "stream:analytics_triggers"
17
+ self.last_id = "0"
18
+ self.poll_interval = 1.0 # Seconds between polls
19
 
20
  async def start_listener(self):
21
  """🎧 Poll Upstash HTTP API for trigger events"""
22
+ logger.info(f"[manager] 🎧 listening via HTTP polling (interval: {self.poll_interval}s)")
23
 
24
  while True:
25
  try:
26
+ # CRITICAL: Remove 'block' parameter - not supported by Upstash
27
  result = redis.xread({self.stream_key: self.last_id}, count=10)
28
 
29
  if result:
30
+ # Parse messages
31
  messages = result.get(self.stream_key, [])
32
+ logger.info(f"[manager] πŸ“₯ received {len(messages)} messages")
33
 
34
  for msg_id, msg_data in messages:
35
  try:
 
36
  payload = json.loads(msg_data.get("message", "{}"))
37
  org_id = payload.get("org_id")
38
  source_id = payload.get("source_id")
39
 
40
  if org_id and source_id:
41
+ logger.info(f"[manager] πŸš€ processing {org_id}:{source_id}")
42
  await self.spawn_worker(org_id, source_id)
43
  self.last_id = msg_id # Update resume point
44
  else:
45
+ logger.warning(f"[manager] ⚠️ missing IDs in payload: {payload}")
46
  except json.JSONDecodeError as e:
47
  logger.error(f"[manager] ❌ JSON error: {e}")
48
+
49
+ # Small delay after processing batch
50
+ await asyncio.sleep(0.1)
51
  else:
52
+ # CRITICAL: Sleep when no messages
53
+ logger.debug("[manager] πŸ’€ no messages, sleeping")
54
+ await asyncio.sleep(self.poll_interval)
55
 
56
  except Exception as e:
57
  logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
58
+ await asyncio.sleep(5) # Back off on error
59
 
60
  async def spawn_worker(self, org_id: str, source_id: str):
61
+ """Spawn worker with distributed lock"""
62
  worker_key = f"{org_id}:{source_id}"
63
  lock_key = f"worker_lock:{worker_key}"
64
 
65
  try:
66
+ # Check existing lock
67
  if redis.exists(lock_key):
68
+ logger.debug(f"[manager] ⏭️ worker locked: {worker_key}")
69
  return
70
 
71
+ # Set lock
72
  redis.setex(lock_key, self.lock_ttl, "1")
73
+ logger.info(f"[manager] πŸ”’ lock acquired: {lock_key}")
74
 
75
+ # Create worker
76
  worker = AnalyticsWorker(org_id, source_id)
77
  self.active_workers[worker_key] = worker
78
 
 
84
  redis.delete(lock_key)
85
 
86
  async def _run_worker(self, worker: AnalyticsWorker, worker_key: str, lock_key: str):
87
+ """Execute worker and cleanup"""
88
  try:
89
  results = await worker.run()
90
+ logger.info(f"[manager] βœ… worker complete: {worker_key}")
91
  except Exception as e:
92
  logger.error(f"[manager] ❌ worker failed: {worker_key} - {e}", exc_info=True)
93
  finally:
94
  # Cleanup
95
  self.active_workers.pop(worker_key, None)
96
  redis.delete(lock_key)
97
+ logger.info(f"[manager] 🧹 cleaned up: {worker_key}")
98
 
99
  worker_manager = WorkerManager()