from typing import Dict, List from fastapi import WebSocket class WebSocketConnectionManager: def __init__(self): # Maps user_id -> List[WebSocket] self.active_connections: Dict[str, List[WebSocket]] = {} async def connect(self, websocket: WebSocket, user_id: str): await websocket.accept() if user_id not in self.active_connections: self.active_connections[user_id] = [] self.active_connections[user_id].append(websocket) print(f"WebSocket client connected for user: {user_id}") def disconnect(self, websocket: WebSocket, user_id: str): if user_id in self.active_connections: if websocket in self.active_connections[user_id]: self.active_connections[user_id].remove(websocket) if not self.active_connections[user_id]: del self.active_connections[user_id] print(f"WebSocket client disconnected for user: {user_id}") async def send_personal_message(self, message: dict, user_id: str): if user_id in self.active_connections: for connection in self.active_connections[user_id]: try: await connection.send_json(message) except Exception as e: print(f"Error sending message to {user_id}: {e}") async def broadcast(self, message: dict): for user_id, connections in self.active_connections.items(): for connection in connections: try: await connection.send_json(message) except Exception as e: print(f"Error broadcasting message: {e}") # Global Connection Manager instance ws_manager = WebSocketConnectionManager()