AIDA / app /services /redis_pubsub.py
destinyebuka's picture
fyp
653bf3c
# ============================================================
# app/services/redis_pubsub.py - Redis Pub/Sub for WebSockets
# ============================================================
#
# Enables horizontal scaling effectively by broadcasting WebSocket
# events across multiple server instances via Redis.
#
# Pattern: "Broadcast"
# 1. Any server can publish a message to the global channel
# 2. All servers subscribe to the channel
# 3. Each server checks if the target user(s) are connected locally
# 4. If connected, the server delivers the message via WebSocket
# ============================================================
import json
import logging
import asyncio
from typing import Callable, Any, Optional
from app.ai.config import redis_client
logger = logging.getLogger(__name__)
# Global channel name
CHAT_CHANNEL = "lojiz_chat_global"
class RedisPubSubService:
"""Service for handling Redis Pub/Sub operations"""
def __init__(self):
self.is_listening = False
self.message_handler: Optional[Callable[[dict], Any]] = None
async def publish(self, event_type: str, data: dict):
"""
Publish an event to the global Redis channel.
Args:
event_type: "broadcast" (multiple users) or "direct" (single user)
data: The payload containing target_users and message
"""
if not redis_client:
logger.warning("Redis not available, pub/sub disabled")
return
payload = {
"type": event_type,
"data": data,
"source_server": id(self), # Simple way to identify sender (not robust across processes but helpful)
}
try:
await redis_client.publish(CHAT_CHANNEL, json.dumps(payload))
except Exception as e:
logger.error(f"Redis publish failed: {e}")
async def start_listening(self, handler: Callable[[dict], Any]):
"""
Start listening to the global channel in a background task.
Args:
handler: Async function to process received messages
"""
if not redis_client:
return
self.message_handler = handler
self.is_listening = True
# Start background listener
asyncio.create_task(self._listener_loop())
logger.info(f"✅ Redis Pub/Sub listener started on channel: {CHAT_CHANNEL}")
async def _listener_loop(self):
"""
Background loop to listen for Redis messages with auto-reconnect.
Uses get_message with timeout to avoid Redis Cloud idle timeout (30s).
"""
retry_count = 0
max_retries = 10
base_delay = 1 # seconds
while self.is_listening and retry_count < max_retries:
pubsub = None
try:
pubsub = redis_client.pubsub()
await pubsub.subscribe(CHAT_CHANNEL)
logger.info(f"📡 Redis Pub/Sub connected to {CHAT_CHANNEL}")
retry_count = 0 # Reset on successful connection
# Use get_message with timeout instead of listen()
# This prevents Redis Cloud idle timeout (30s) by not blocking forever
while self.is_listening:
try:
# Wait up to 20 seconds for a message (before Redis Cloud's 30s timeout)
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=20.0)
if message is not None and message["type"] == "message":
try:
payload = json.loads(message["data"])
if self.message_handler:
await self.message_handler(payload)
except json.JSONDecodeError:
logger.warning("Received invalid JSON in Redis message")
except Exception as e:
logger.error(f"Error processing Redis message: {e}")
# If no message received, the timeout just expired - this is normal
# The connection is still alive, we just loop again
except asyncio.CancelledError:
break
except Exception as e:
retry_count += 1
delay = min(base_delay * (2 ** retry_count), 60) # Max 60s delay
logger.error(f"Redis listener error: {e}. Reconnecting in {delay}s (attempt {retry_count}/{max_retries})")
await asyncio.sleep(delay)
finally:
if pubsub:
try:
await pubsub.unsubscribe(CHAT_CHANNEL)
await pubsub.close()
except:
pass
if retry_count >= max_retries:
logger.error(f"❌ Redis Pub/Sub gave up after {max_retries} retries")
# Singleton instance
redis_pubsub = RedisPubSubService()