destinyebuka commited on
Commit
f2c1db2
·
1 Parent(s): 9a228c2
Files changed (1) hide show
  1. app/services/redis_pubsub.py +26 -14
app/services/redis_pubsub.py CHANGED
@@ -71,7 +71,10 @@ class RedisPubSubService:
71
  logger.info(f"✅ Redis Pub/Sub listener started on channel: {CHAT_CHANNEL}")
72
 
73
  async def _listener_loop(self):
74
- """Background loop to listen for Redis messages with auto-reconnect"""
 
 
 
75
  retry_count = 0
76
  max_retries = 10
77
  base_delay = 1 # seconds
@@ -84,24 +87,33 @@ class RedisPubSubService:
84
  logger.info(f"📡 Redis Pub/Sub connected to {CHAT_CHANNEL}")
85
  retry_count = 0 # Reset on successful connection
86
 
87
- async for message in pubsub.listen():
88
- if not self.is_listening:
89
- break
 
 
 
90
 
91
- if message["type"] == "message":
92
- try:
93
- payload = json.loads(message["data"])
94
- if self.message_handler:
95
- await self.message_handler(payload)
96
- except json.JSONDecodeError:
97
- logger.warning("Received invalid JSON in Redis message")
98
- except Exception as e:
99
- logger.error(f"Error processing Redis message: {e}")
 
 
 
 
 
 
100
 
101
  except Exception as e:
102
  retry_count += 1
103
  delay = min(base_delay * (2 ** retry_count), 60) # Max 60s delay
104
- logger.error(f"Redis listener crashed: {e}. Reconnecting in {delay}s (attempt {retry_count}/{max_retries})")
105
  await asyncio.sleep(delay)
106
 
107
  finally:
 
71
  logger.info(f"✅ Redis Pub/Sub listener started on channel: {CHAT_CHANNEL}")
72
 
73
  async def _listener_loop(self):
74
+ """
75
+ Background loop to listen for Redis messages with auto-reconnect.
76
+ Uses get_message with timeout to avoid Redis Cloud idle timeout (30s).
77
+ """
78
  retry_count = 0
79
  max_retries = 10
80
  base_delay = 1 # seconds
 
87
  logger.info(f"📡 Redis Pub/Sub connected to {CHAT_CHANNEL}")
88
  retry_count = 0 # Reset on successful connection
89
 
90
+ # Use get_message with timeout instead of listen()
91
+ # This prevents Redis Cloud idle timeout (30s) by not blocking forever
92
+ while self.is_listening:
93
+ try:
94
+ # Wait up to 20 seconds for a message (before Redis Cloud's 30s timeout)
95
+ message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=20.0)
96
 
97
+ if message is not None and message["type"] == "message":
98
+ try:
99
+ payload = json.loads(message["data"])
100
+ if self.message_handler:
101
+ await self.message_handler(payload)
102
+ except json.JSONDecodeError:
103
+ logger.warning("Received invalid JSON in Redis message")
104
+ except Exception as e:
105
+ logger.error(f"Error processing Redis message: {e}")
106
+
107
+ # If no message received, the timeout just expired - this is normal
108
+ # The connection is still alive, we just loop again
109
+
110
+ except asyncio.CancelledError:
111
+ break
112
 
113
  except Exception as e:
114
  retry_count += 1
115
  delay = min(base_delay * (2 ** retry_count), 60) # Max 60s delay
116
+ logger.error(f"Redis listener error: {e}. Reconnecting in {delay}s (attempt {retry_count}/{max_retries})")
117
  await asyncio.sleep(delay)
118
 
119
  finally: