Spaces:
Running
Running
| """WebSocket ConnectionManager for multi-client broadcasting. | |
| [Task]: T068 | |
| [From]: specs/004-ai-chatbot/tasks.md | |
| This module provides connection management for WebSocket connections, | |
| supporting multiple concurrent connections per user (e.g., multiple browser tabs). | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, List | |
| from fastapi import WebSocket | |
| logger = logging.getLogger("websockets.manager") | |
| class ConnectionManager: | |
| """Manages WebSocket connections for broadcasting progress events. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| This manager: | |
| - Tracks multiple WebSocket connections per user_id | |
| - Supports broadcasting to all connections for a user | |
| - Handles connection lifecycle (connect, disconnect, cleanup) | |
| - Provides graceful handling of connection errors | |
| Attributes: | |
| active_connections: Mapping of user_id to list of WebSocket connections | |
| """ | |
| def __init__(self) -> None: | |
| """Initialize the connection manager with empty connection tracking.""" | |
| # user_id -> list of WebSocket connections | |
| # Supports multiple tabs per user | |
| self.active_connections: Dict[str, List[WebSocket]] = {} | |
| async def connect(self, user_id: str, websocket: WebSocket) -> None: | |
| """Accept and register a new WebSocket connection. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| Args: | |
| user_id: The user's unique identifier (UUID string) | |
| websocket: The WebSocket connection to register | |
| Sends a connection_established event to the client upon successful connection. | |
| """ | |
| await websocket.accept() | |
| # Initialize connection list for new users | |
| if user_id not in self.active_connections: | |
| self.active_connections[user_id] = [] | |
| # Add this connection to the user's list | |
| self.active_connections[user_id].append(websocket) | |
| logger.info(f"WebSocket connected for user {user_id}. " | |
| f"Total connections for user: {len(self.active_connections[user_id])}") | |
| # Send confirmation event to client | |
| await self.send_personal({ | |
| "event_type": "connection_established", | |
| "message": "Connected to real-time updates", | |
| "user_id": user_id, | |
| }, websocket) | |
| def disconnect(self, user_id: str, websocket: WebSocket) -> None: | |
| """Remove a WebSocket connection from tracking. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| Args: | |
| user_id: The user's unique identifier | |
| websocket: The WebSocket connection to remove | |
| Cleans up empty connection lists to prevent memory leaks. | |
| """ | |
| if user_id in self.active_connections: | |
| try: | |
| self.active_connections[user_id].remove(websocket) | |
| logger.info(f"WebSocket disconnected for user {user_id}. " | |
| f"Remaining connections: {len(self.active_connections[user_id])}") | |
| # Clean up empty connection lists | |
| if not self.active_connections[user_id]: | |
| del self.active_connections[user_id] | |
| logger.debug(f"Removed empty connection list for user {user_id}") | |
| except ValueError: | |
| # Connection already removed | |
| logger.warning(f"Attempted to remove non-existent connection for user {user_id}") | |
| async def send_personal(self, message: dict, websocket: WebSocket) -> None: | |
| """Send a message to a specific WebSocket connection. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| Args: | |
| message: The message dictionary to send (will be JSON serialized) | |
| websocket: The target WebSocket connection | |
| """ | |
| try: | |
| await websocket.send_json(message) | |
| except Exception as e: | |
| logger.error(f"Failed to send message to WebSocket: {e}") | |
| # Don't raise - connection may be closing | |
| async def broadcast(self, user_id: str, message: dict) -> None: | |
| """Send a message to all active connections for a user. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| This is the primary method for sending progress events to all | |
| browser tabs/devices a user has open. | |
| Args: | |
| user_id: The user's unique identifier | |
| message: The message dictionary to broadcast (will be JSON serialized) | |
| Handles connection errors gracefully - if a connection fails, | |
| it's removed but other connections continue to receive messages. | |
| """ | |
| if user_id not in self.active_connections: | |
| logger.debug(f"No active connections for user {user_id}") | |
| return | |
| # Track failed connections for cleanup | |
| failed_connections: List[WebSocket] = [] | |
| for connection in self.active_connections[user_id]: | |
| try: | |
| await connection.send_json(message) | |
| except Exception as e: | |
| logger.warning(f"Failed to send to connection for user {user_id}: {e}") | |
| failed_connections.append(connection) | |
| # Clean up failed connections | |
| for failed in failed_connections: | |
| self.disconnect(user_id, failed) | |
| def get_connection_count(self, user_id: str) -> int: | |
| """Get the number of active connections for a user. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| Args: | |
| user_id: The user's unique identifier | |
| Returns: | |
| The number of active WebSocket connections for this user | |
| """ | |
| return len(self.active_connections.get(user_id, [])) | |
| async def broadcast_to_all(self, message: dict) -> None: | |
| """Broadcast a message to all connected users. | |
| [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| This is useful for system-wide announcements or maintenance notices. | |
| Args: | |
| message: The message dictionary to broadcast | |
| """ | |
| for user_id in list(self.active_connections.keys()): | |
| await self.broadcast(user_id, message) | |
| async def close_all_connections(self) -> None: | |
| """Close all active WebSocket connections. | |
| Useful for server shutdown or maintenance. | |
| """ | |
| for user_id, connections in list(self.active_connections.items()): | |
| for connection in connections: | |
| try: | |
| await connection.close() | |
| except Exception: | |
| pass # Connection may already be closed | |
| self.active_connections.clear() | |
| logger.info("All WebSocket connections closed") | |
| # Global singleton instance | |
| # [From]: specs/004-ai-chatbot/research.md - Section 4 | |
| # Import this instance in other modules to use the connection manager | |
| manager = ConnectionManager() | |
| async def get_manager() -> ConnectionManager: | |
| """Dependency injection helper for FastAPI routes. | |
| Returns: | |
| The global ConnectionManager instance | |
| """ | |
| return manager | |