Peter Mutwiri commited on
Commit
d37b00c
Β·
1 Parent(s): 334913c

fixed redis_async_client

Browse files
app/core/worker_manager.py CHANGED
@@ -1,7 +1,7 @@
1
  # app/core/worker_manager.py
2
  import asyncio
3
  import json
4
- from app.redis_async_client import redis_async # βœ… Use async client
5
  from app.tasks.analytics_worker import AnalyticsWorker
6
  from app.redis_client import redis # βœ… Keep Upstash client for locks
7
  import logging
@@ -16,10 +16,13 @@ class WorkerManager:
16
  async def start_listener(self):
17
  logger.info("[manager] 🎧 worker manager listening for events")
18
 
19
- # βœ… Use async Redis client for pubsub
20
  pubsub = redis_async.pubsub()
 
 
21
  await pubsub.psubscribe("analytics_trigger:*")
22
 
 
23
  async for message in pubsub.listen():
24
  if message["type"] == "pmessage":
25
  try:
@@ -34,13 +37,11 @@ class WorkerManager:
34
  async def spawn_worker(self, org_id: str, source_id: str):
35
  worker_key = f"{org_id}:{source_id}"
36
 
37
- # βœ… Use Upstash sync client for simple lock operations
38
  if redis.exists(f"worker_lock:{worker_key}"):
39
- logger.debug(f"[manager] ⏭️ worker already active for {worker_key}")
40
  return
41
 
42
  try:
43
- # βœ… Use Upstash sync client for lock
44
  redis.setex(f"worker_lock:{worker_key}", self.lock_ttl, "1")
45
 
46
  logger.info(f"[manager] πŸš€ spawning worker for {worker_key}")
 
1
  # app/core/worker_manager.py
2
  import asyncio
3
  import json
4
+ from app.redis_async_client import redis_async # βœ… NEW: Import async client
5
  from app.tasks.analytics_worker import AnalyticsWorker
6
  from app.redis_client import redis # βœ… Keep Upstash client for locks
7
  import logging
 
16
  async def start_listener(self):
17
  logger.info("[manager] 🎧 worker manager listening for events")
18
 
19
+ # βœ… FIX: Use async client for pubsub
20
  pubsub = redis_async.pubsub()
21
+
22
+ # Subscribe to pattern
23
  await pubsub.psubscribe("analytics_trigger:*")
24
 
25
+ # Listen async (this requires async client)
26
  async for message in pubsub.listen():
27
  if message["type"] == "pmessage":
28
  try:
 
37
  async def spawn_worker(self, org_id: str, source_id: str):
38
  worker_key = f"{org_id}:{source_id}"
39
 
40
+ # βœ… Keep using Upstash sync client for simple lock operations
41
  if redis.exists(f"worker_lock:{worker_key}"):
 
42
  return
43
 
44
  try:
 
45
  redis.setex(f"worker_lock:{worker_key}", self.lock_ttl, "1")
46
 
47
  logger.info(f"[manager] πŸš€ spawning worker for {worker_key}")
app/redis_async_client.py CHANGED
@@ -1,14 +1,7 @@
1
  # app/redis_async_client.py
2
  """
3
  Async Redis client for Upstash using native Redis protocol.
4
-
5
- This module creates an async connection pool for pubsub operations
6
- required by the worker manager, while using the same Upstash credentials
7
- from HF Secrets.
8
-
9
- Environment Variables (from HF Secrets):
10
- - UPSTASH_REDIS_REST_URL: The HTTP REST URL (e.g., https://host[:port])
11
- - UPSTASH_REDIS_REST_TOKEN: The authentication token
12
  """
13
 
14
  import redis.asyncio as aioredis
@@ -20,23 +13,18 @@ UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
20
  UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
21
 
22
  if not UPSTASH_REDIS_REST_URL:
23
- raise RuntimeError("UPSTASH_REDIS_REST_URL is not set in HF Secrets")
24
 
25
- # Parse the REST URL to extract host and port
26
  parsed = urlparse(UPSTASH_REDIS_REST_URL)
27
- host = parsed.hostname
28
 
29
- if not host:
30
- raise RuntimeError(f"Could not parse hostname from UPSTASH_REDIS_REST_URL: {UPSTASH_REDIS_REST_URL}")
31
-
32
- # Default Redis port if not specified
33
  port = parsed.port if parsed.port else 6379
34
 
35
  # Construct native Redis protocol URL
36
  REDIS_URL = f"redis://{host}:{port}"
37
 
38
- # Create async Redis client connection pool
39
- # Upstash token is used as the Redis AUTH password
40
  redis_async = aioredis.from_url(
41
  REDIS_URL,
42
  password=UPSTASH_REDIS_REST_TOKEN,
@@ -47,17 +35,5 @@ redis_async = aioredis.from_url(
47
  max_connections=10,
48
  )
49
 
50
- async def test_async_connection():
51
- """
52
- Test the async Redis connection.
53
- Call this during app startup to verify connectivity.
54
- """
55
- try:
56
- await redis_async.ping()
57
- print(f"βœ… Async Redis connected to {host}:{port}")
58
- return True
59
- except Exception as e:
60
- print(f"❌ Async Redis connection failed: {e}")
61
- return False
62
-
63
- __all__ = ["redis_async", "test_async_connection"]
 
1
  # app/redis_async_client.py
2
  """
3
  Async Redis client for Upstash using native Redis protocol.
4
+ This is separate from the Upstash HTTP client used elsewhere.
 
 
 
 
 
 
 
5
  """
6
 
7
  import redis.asyncio as aioredis
 
13
  UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
14
 
15
  if not UPSTASH_REDIS_REST_URL:
16
+ raise RuntimeError("UPSTASH_REDIS_REST_URL not found in HF Secrets")
17
 
18
+ # Parse the REST URL to get host and port
19
  parsed = urlparse(UPSTASH_REDIS_REST_URL)
 
20
 
21
+ host = parsed.hostname
 
 
 
22
  port = parsed.port if parsed.port else 6379
23
 
24
  # Construct native Redis protocol URL
25
  REDIS_URL = f"redis://{host}:{port}"
26
 
27
+ # Create async Redis client
 
28
  redis_async = aioredis.from_url(
29
  REDIS_URL,
30
  password=UPSTASH_REDIS_REST_TOKEN,
 
35
  max_connections=10,
36
  )
37
 
38
+ # Export for use
39
+ __all__ = ["redis_async"]