Peter Mutwiri commited on
Commit
4e13540
Β·
1 Parent(s): d37b00c

added https instead of pub/redis

Browse files
Files changed (2) hide show
  1. app/core/event_hub.py +12 -16
  2. app/core/worker_manager.py +31 -23
app/core/event_hub.py CHANGED
@@ -6,10 +6,10 @@ and read recent stream entries without importing `redis` directly.
6
  import json
7
  from datetime import datetime
8
  from typing import Any, Dict
9
-
10
  from app.deps import get_redis
11
 
12
-
13
  class EventHub:
14
  def __init__(self):
15
  self.redis = get_redis()
@@ -72,27 +72,23 @@ class EventHub:
72
  return self.redis.publish(channel, json.dumps(payload))
73
 
74
  def emit_analytics_trigger(self, org_id: str, source_id: str, extra: Dict | None = None):
75
- """Publish trigger both to pubsub (for immediate listeners) and add to a small stream
76
- so systems listening to keyspace events (xadd) can also react.
77
- """
78
  payload = {
79
- "org_id": org_id,
80
  "source_id": source_id,
81
  "timestamp": datetime.utcnow().isoformat(),
82
  }
83
  if extra:
84
- payload.update(extra)
85
 
86
- # PubSub for backward compatibility
 
87
  try:
88
- self.redis.publish(self.trigger_channel(org_id, source_id), json.dumps(payload))
89
- except Exception:
90
- pass
91
-
92
- # Also write to a stream so consumers using streams can react
93
- try:
94
- return self.redis.xadd(f"stream:analytics_trigger:{org_id}:{source_id}", {"message": json.dumps(payload)})
95
- except Exception:
96
  return None
97
 
98
  def ensure_consumer_group(self, stream_key: str, group: str):
 
6
  import json
7
  from datetime import datetime
8
  from typing import Any, Dict
9
+ import logging
10
  from app.deps import get_redis
11
 
12
+ logger = logging.getLogger(__name__)
13
  class EventHub:
14
  def __init__(self):
15
  self.redis = get_redis()
 
72
  return self.redis.publish(channel, json.dumps(payload))
73
 
74
  def emit_analytics_trigger(self, org_id: str, source_id: str, extra: Dict | None = None):
75
+ """Publish trigger to stream (worker manager polls this)"""
 
 
76
  payload = {
77
+ "org_id": org_id,
78
  "source_id": source_id,
79
  "timestamp": datetime.utcnow().isoformat(),
80
  }
81
  if extra:
82
+ payload.update(extra)
83
 
84
+ stream_key = f"stream:analytics_trigger:{org_id}:{source_id}"
85
+
86
  try:
87
+ msg_id = self.redis.xadd(stream_key, {"message": json.dumps(payload)})
88
+ logger.info(f"[hub] πŸ“€ emitted analytics trigger to {stream_key} (id: {msg_id})")
89
+ return msg_id
90
+ except Exception as e:
91
+ logger.error(f"[hub] ❌ failed to emit trigger: {e}", exc_info=True)
 
 
 
92
  return None
93
 
94
  def ensure_consumer_group(self, stream_key: str, group: str):
app/core/worker_manager.py CHANGED
@@ -1,9 +1,8 @@
1
- # app/core/worker_manager.py
2
  import asyncio
3
  import json
4
- from app.redis_async_client import redis_async # βœ… NEW: Import async client
5
  from app.tasks.analytics_worker import AnalyticsWorker
6
- from app.redis_client import redis # βœ… Keep Upstash client for locks
7
  import logging
8
 
9
  logger = logging.getLogger(__name__)
@@ -14,30 +13,39 @@ class WorkerManager:
14
  self.lock_ttl = 300
15
 
16
  async def start_listener(self):
17
- logger.info("[manager] 🎧 worker manager listening for events")
 
18
 
19
- # βœ… FIX: Use async client for pubsub
20
- pubsub = redis_async.pubsub()
21
-
22
- # Subscribe to pattern
23
- await pubsub.psubscribe("analytics_trigger:*")
24
-
25
- # Listen async (this requires async client)
26
- async for message in pubsub.listen():
27
- if message["type"] == "pmessage":
28
- try:
29
- channel = message["channel"].decode()
30
- if "analytics_trigger" in channel:
31
- parts = channel.split(":")
32
- org_id, source_id = parts[2], parts[3]
33
- await self.spawn_worker(org_id, source_id)
34
- except Exception as e:
35
- logger.error(f"[manager] ❌ failed: {e}", exc_info=True)
 
 
 
 
 
 
 
 
36
 
37
  async def spawn_worker(self, org_id: str, source_id: str):
38
  worker_key = f"{org_id}:{source_id}"
39
 
40
- # βœ… Keep using Upstash sync client for simple lock operations
41
  if redis.exists(f"worker_lock:{worker_key}"):
42
  return
43
 
@@ -51,7 +59,7 @@ class WorkerManager:
51
  asyncio.create_task(self._run_worker(worker, worker_key))
52
 
53
  except Exception as e:
54
- logger.error(f"[manager] ❌ failed: {e}", exc_info=True)
55
  redis.delete(f"worker_lock:{worker_key}")
56
 
57
  async def _run_worker(self, worker: AnalyticsWorker, worker_key: str):
 
 
1
  import asyncio
2
  import json
3
+ import aiohttp # Add to requirements.txt
4
  from app.tasks.analytics_worker import AnalyticsWorker
5
+ from app.redis_client import redis # Use Upstash client
6
  import logging
7
 
8
  logger = logging.getLogger(__name__)
 
13
  self.lock_ttl = 300
14
 
15
  async def start_listener(self):
16
+ """🎧 Poll Upstash HTTP API for trigger events instead of pubsub"""
17
+ logger.info("[manager] 🎧 worker manager listening for events via HTTP polling")
18
 
19
+ while True:
20
+ try:
21
+ # Poll Redis stream for trigger messages
22
+ # Stream key: stream:analytics_trigger:{orgId}:{sourceId}
23
+ stream_key = "stream:analytics_trigger:*"
24
+
25
+ # Use Upstash HTTP client to read from stream
26
+ # XREAD with BLOCK 1000ms (1 second)
27
+ result = redis.xread({stream_key: "0"}, count=10, block=1000)
28
+
29
+ if result:
30
+ for stream_name, messages in result:
31
+ for msg_id, msg_data in messages:
32
+ # Extract org_id and source_id from stream name
33
+ parts = stream_name.decode().split(":")
34
+ if len(parts) >= 4:
35
+ org_id, source_id = parts[2], parts[3]
36
+ await self.spawn_worker(org_id, source_id)
37
+ else:
38
+ # No messages, sleep before next poll
39
+ await asyncio.sleep(1)
40
+
41
+ except Exception as e:
42
+ logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
43
+ await asyncio.sleep(5) # Back off on error
44
 
45
  async def spawn_worker(self, org_id: str, source_id: str):
46
  worker_key = f"{org_id}:{source_id}"
47
 
48
+ # Use Upstash client for locks
49
  if redis.exists(f"worker_lock:{worker_key}"):
50
  return
51
 
 
59
  asyncio.create_task(self._run_worker(worker, worker_key))
60
 
61
  except Exception as e:
62
+ logger.error(f"[manager] ❌ failed to spawn: {e}", exc_info=True)
63
  redis.delete(f"worker_lock:{worker_key}")
64
 
65
  async def _run_worker(self, worker: AnalyticsWorker, worker_key: str):