Spaces:
Sleeping
Sleeping
| # ============================================================ | |
| # app/services/redis_pubsub.py - Redis Pub/Sub for WebSockets | |
| # ============================================================ | |
| # | |
| # Enables horizontal scaling effectively by broadcasting WebSocket | |
| # events across multiple server instances via Redis. | |
| # | |
| # Pattern: "Broadcast" | |
| # 1. Any server can publish a message to the global channel | |
| # 2. All servers subscribe to the channel | |
| # 3. Each server checks if the target user(s) are connected locally | |
| # 4. If connected, the server delivers the message via WebSocket | |
| # ============================================================ | |
| import json | |
| import logging | |
| import asyncio | |
| from typing import Callable, Any, Optional | |
| from app.ai.config import redis_client | |
| logger = logging.getLogger(__name__) | |
| # Global channel name | |
| CHAT_CHANNEL = "lojiz_chat_global" | |
| class RedisPubSubService: | |
| """Service for handling Redis Pub/Sub operations""" | |
| def __init__(self): | |
| self.is_listening = False | |
| self.message_handler: Optional[Callable[[dict], Any]] = None | |
| async def publish(self, event_type: str, data: dict): | |
| """ | |
| Publish an event to the global Redis channel. | |
| Args: | |
| event_type: "broadcast" (multiple users) or "direct" (single user) | |
| data: The payload containing target_users and message | |
| """ | |
| if not redis_client: | |
| logger.warning("Redis not available, pub/sub disabled") | |
| return | |
| payload = { | |
| "type": event_type, | |
| "data": data, | |
| "source_server": id(self), # Simple way to identify sender (not robust across processes but helpful) | |
| } | |
| try: | |
| await redis_client.publish(CHAT_CHANNEL, json.dumps(payload)) | |
| except Exception as e: | |
| logger.error(f"Redis publish failed: {e}") | |
| async def start_listening(self, handler: Callable[[dict], Any]): | |
| """ | |
| Start listening to the global channel in a background task. | |
| Args: | |
| handler: Async function to process received messages | |
| """ | |
| if not redis_client: | |
| return | |
| self.message_handler = handler | |
| self.is_listening = True | |
| # Start background listener | |
| asyncio.create_task(self._listener_loop()) | |
| logger.info(f"✅ Redis Pub/Sub listener started on channel: {CHAT_CHANNEL}") | |
| async def _listener_loop(self): | |
| """ | |
| Background loop to listen for Redis messages with auto-reconnect. | |
| Uses get_message with timeout to avoid Redis Cloud idle timeout (30s). | |
| """ | |
| retry_count = 0 | |
| max_retries = 10 | |
| base_delay = 1 # seconds | |
| while self.is_listening and retry_count < max_retries: | |
| pubsub = None | |
| try: | |
| pubsub = redis_client.pubsub() | |
| await pubsub.subscribe(CHAT_CHANNEL) | |
| logger.info(f"📡 Redis Pub/Sub connected to {CHAT_CHANNEL}") | |
| retry_count = 0 # Reset on successful connection | |
| # Use get_message with timeout instead of listen() | |
| # This prevents Redis Cloud idle timeout (30s) by not blocking forever | |
| while self.is_listening: | |
| try: | |
| # Wait up to 20 seconds for a message (before Redis Cloud's 30s timeout) | |
| message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=20.0) | |
| if message is not None and message["type"] == "message": | |
| try: | |
| payload = json.loads(message["data"]) | |
| if self.message_handler: | |
| await self.message_handler(payload) | |
| except json.JSONDecodeError: | |
| logger.warning("Received invalid JSON in Redis message") | |
| except Exception as e: | |
| logger.error(f"Error processing Redis message: {e}") | |
| # If no message received, the timeout just expired - this is normal | |
| # The connection is still alive, we just loop again | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| retry_count += 1 | |
| delay = min(base_delay * (2 ** retry_count), 60) # Max 60s delay | |
| logger.error(f"Redis listener error: {e}. Reconnecting in {delay}s (attempt {retry_count}/{max_retries})") | |
| await asyncio.sleep(delay) | |
| finally: | |
| if pubsub: | |
| try: | |
| await pubsub.unsubscribe(CHAT_CHANNEL) | |
| await pubsub.close() | |
| except: | |
| pass | |
| if retry_count >= max_retries: | |
| logger.error(f"❌ Redis Pub/Sub gave up after {max_retries} retries") | |
| # Singleton instance | |
| redis_pubsub = RedisPubSubService() | |