Peter Mutwiri commited on
Commit
f962ee5
Β·
1 Parent(s): 4e13540

fixed analytics event emmiter

Browse files
Files changed (2) hide show
  1. app/core/event_hub.py +19 -17
  2. app/core/worker_manager.py +50 -36
app/core/event_hub.py CHANGED
@@ -71,25 +71,27 @@ class EventHub:
71
  channel = f"analytics:{org_id}:{source_id}:error"
72
  return self.redis.publish(channel, json.dumps(payload))
73
 
74
- def emit_analytics_trigger(self, org_id: str, source_id: str, extra: Dict | None = None):
75
- """Publish trigger to stream (worker manager polls this)"""
76
- payload = {
77
- "org_id": org_id,
78
- "source_id": source_id,
79
- "timestamp": datetime.utcnow().isoformat(),
80
- }
81
- if extra:
82
- payload.update(extra)
83
 
84
- stream_key = f"stream:analytics_trigger:{org_id}:{source_id}"
 
 
85
 
86
- try:
87
- msg_id = self.redis.xadd(stream_key, {"message": json.dumps(payload)})
88
- logger.info(f"[hub] πŸ“€ emitted analytics trigger to {stream_key} (id: {msg_id})")
89
- return msg_id
90
- except Exception as e:
91
- logger.error(f"[hub] ❌ failed to emit trigger: {e}", exc_info=True)
92
- return None
 
 
 
 
 
 
 
 
93
 
94
  def ensure_consumer_group(self, stream_key: str, group: str):
95
  try:
 
71
  channel = f"analytics:{org_id}:{source_id}:error"
72
  return self.redis.publish(channel, json.dumps(payload))
73
 
74
+ # app/core/event_hub.py
 
 
 
 
 
 
 
 
75
 
76
+ def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
77
+ """Write trigger to single stream (worker manager polls this)"""
78
+ stream_key = "stream:analytics_triggers"
79
 
80
+ payload = {
81
+ "org_id": org_id,
82
+ "source_id": source_id,
83
+ "timestamp": datetime.utcnow().isoformat(),
84
+ }
85
+ if extra:
86
+ payload.update(extra)
87
+
88
+ try:
89
+ msg_id = self.redis.xadd(stream_key, {"message": json.dumps(payload)})
90
+ logger.info(f"[hub] πŸ“€ emitted trigger to {stream_key} (id: {msg_id})")
91
+ return msg_id
92
+ except Exception as e:
93
+ logger.error(f"[hub] ❌ failed to emit trigger: {e}", exc_info=True)
94
+ return None
95
 
96
  def ensure_consumer_group(self, stream_key: str, group: str):
97
  try:
app/core/worker_manager.py CHANGED
@@ -1,77 +1,91 @@
 
 
1
  import asyncio
2
  import json
3
- import aiohttp # Add to requirements.txt
4
  from app.tasks.analytics_worker import AnalyticsWorker
5
- from app.redis_client import redis # Use Upstash client
6
  import logging
7
 
8
  logger = logging.getLogger(__name__)
9
 
10
  class WorkerManager:
11
  def __init__(self):
12
- self.active_workers = {}
13
  self.lock_ttl = 300
 
 
14
 
15
  async def start_listener(self):
16
- """🎧 Poll Upstash HTTP API for trigger events instead of pubsub"""
17
- logger.info("[manager] 🎧 worker manager listening for events via HTTP polling")
18
 
19
  while True:
20
  try:
21
- # Poll Redis stream for trigger messages
22
- # Stream key: stream:analytics_trigger:{orgId}:{sourceId}
23
- stream_key = "stream:analytics_trigger:*"
24
-
25
- # Use Upstash HTTP client to read from stream
26
- # XREAD with BLOCK 1000ms (1 second)
27
- result = redis.xread({stream_key: "0"}, count=10, block=1000)
28
 
29
  if result:
30
- for stream_name, messages in result:
31
- for msg_id, msg_data in messages:
32
- # Extract org_id and source_id from stream name
33
- parts = stream_name.decode().split(":")
34
- if len(parts) >= 4:
35
- org_id, source_id = parts[2], parts[3]
 
 
 
 
 
 
36
  await self.spawn_worker(org_id, source_id)
 
 
 
 
 
37
  else:
38
- # No messages, sleep before next poll
39
- await asyncio.sleep(1)
40
 
41
  except Exception as e:
42
  logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
43
- await asyncio.sleep(5) # Back off on error
44
 
45
  async def spawn_worker(self, org_id: str, source_id: str):
 
46
  worker_key = f"{org_id}:{source_id}"
47
-
48
- # Use Upstash client for locks
49
- if redis.exists(f"worker_lock:{worker_key}"):
50
- return
51
 
52
  try:
53
- redis.setex(f"worker_lock:{worker_key}", self.lock_ttl, "1")
 
 
 
54
 
 
 
55
  logger.info(f"[manager] πŸš€ spawning worker for {worker_key}")
 
 
56
  worker = AnalyticsWorker(org_id, source_id)
57
  self.active_workers[worker_key] = worker
58
 
59
- asyncio.create_task(self._run_worker(worker, worker_key))
 
60
 
61
  except Exception as e:
62
- logger.error(f"[manager] ❌ failed to spawn: {e}", exc_info=True)
63
- redis.delete(f"worker_lock:{worker_key}")
64
 
65
- async def _run_worker(self, worker: AnalyticsWorker, worker_key: str):
 
66
  try:
67
  results = await worker.run()
68
- logger.info(f"[manager] βœ… worker completed for {worker_key}")
69
  except Exception as e:
70
- logger.error(f"[manager] ❌ worker failed for {worker_key}: {e}", exc_info=True)
71
  finally:
72
- if worker_key in self.active_workers:
73
- del self.active_workers[worker_key]
74
-
75
- redis.delete(f"worker_lock:{worker_key}")
76
 
77
  worker_manager = WorkerManager()
 
1
+ # app/core/worker_manager.py
2
+
3
  import asyncio
4
  import json
 
5
  from app.tasks.analytics_worker import AnalyticsWorker
6
+ from app.redis_client import redis
7
  import logging
8
 
9
  logger = logging.getLogger(__name__)
10
 
11
  class WorkerManager:
12
  def __init__(self):
13
+ self.active_workers: dict = {}
14
  self.lock_ttl = 300
15
+ self.stream_key = "stream:analytics_triggers" # Single stream for all triggers
16
+ self.last_id = "0" # Resume point
17
 
18
  async def start_listener(self):
19
+ """🎧 Poll Upstash HTTP API for trigger events"""
20
+ logger.info("[manager] 🎧 worker manager listening via HTTP polling")
21
 
22
  while True:
23
  try:
24
+ # XREAD without 'block' - Upstash doesn't support it
25
+ result = redis.xread({self.stream_key: self.last_id}, count=10)
 
 
 
 
 
26
 
27
  if result:
28
+ # Upstash returns: {stream: [(msg_id, data), ...]}
29
+ messages = result.get(self.stream_key, [])
30
+
31
+ for msg_id, msg_data in messages:
32
+ try:
33
+ # Parse payload from message body (not stream name!)
34
+ payload = json.loads(msg_data.get("message", "{}"))
35
+ org_id = payload.get("org_id")
36
+ source_id = payload.get("source_id")
37
+
38
+ if org_id and source_id:
39
+ logger.info(f"[manager] πŸ“₯ trigger: {org_id}:{source_id} (msg: {msg_id})")
40
  await self.spawn_worker(org_id, source_id)
41
+ self.last_id = msg_id # Update resume point
42
+ else:
43
+ logger.warning(f"[manager] ⚠️ invalid payload: {msg_data}")
44
+ except json.JSONDecodeError as e:
45
+ logger.error(f"[manager] ❌ JSON error: {e}")
46
  else:
47
+ await asyncio.sleep(1) # Polling interval
 
48
 
49
  except Exception as e:
50
  logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
51
+ await asyncio.sleep(5)
52
 
53
  async def spawn_worker(self, org_id: str, source_id: str):
54
+ """Spawn a worker with distributed locking"""
55
  worker_key = f"{org_id}:{source_id}"
56
+ lock_key = f"worker_lock:{worker_key}"
 
 
 
57
 
58
  try:
59
+ # Check if worker is already running
60
+ if redis.exists(lock_key):
61
+ logger.debug(f"[manager] ⏭️ worker already active: {worker_key}")
62
+ return
63
 
64
+ # Set lock with TTL
65
+ redis.setex(lock_key, self.lock_ttl, "1")
66
  logger.info(f"[manager] πŸš€ spawning worker for {worker_key}")
67
+
68
+ # Create and track worker
69
  worker = AnalyticsWorker(org_id, source_id)
70
  self.active_workers[worker_key] = worker
71
 
72
+ # Run in background
73
+ asyncio.create_task(self._run_worker(worker, worker_key, lock_key))
74
 
75
  except Exception as e:
76
+ logger.error(f"[manager] ❌ spawn failed: {e}", exc_info=True)
77
+ redis.delete(lock_key)
78
 
79
+ async def _run_worker(self, worker: AnalyticsWorker, worker_key: str, lock_key: str):
80
+ """Run worker and cleanup"""
81
  try:
82
  results = await worker.run()
83
+ logger.info(f"[manager] βœ… worker completed: {worker_key}")
84
  except Exception as e:
85
+ logger.error(f"[manager] ❌ worker failed: {worker_key} - {e}", exc_info=True)
86
  finally:
87
+ # Cleanup
88
+ self.active_workers.pop(worker_key, None)
89
+ redis.delete(lock_key)
 
90
 
91
  worker_manager = WorkerManager()