Spaces:
Build error
Build error
| from typing import Dict, List | |
| from fastapi import WebSocket | |
| class WebSocketConnectionManager: | |
| def __init__(self): | |
| # Maps user_id -> List[WebSocket] | |
| self.active_connections: Dict[str, List[WebSocket]] = {} | |
| async def connect(self, websocket: WebSocket, user_id: str): | |
| await websocket.accept() | |
| if user_id not in self.active_connections: | |
| self.active_connections[user_id] = [] | |
| self.active_connections[user_id].append(websocket) | |
| print(f"WebSocket client connected for user: {user_id}") | |
| def disconnect(self, websocket: WebSocket, user_id: str): | |
| if user_id in self.active_connections: | |
| if websocket in self.active_connections[user_id]: | |
| self.active_connections[user_id].remove(websocket) | |
| if not self.active_connections[user_id]: | |
| del self.active_connections[user_id] | |
| print(f"WebSocket client disconnected for user: {user_id}") | |
| async def send_personal_message(self, message: dict, user_id: str): | |
| if user_id in self.active_connections: | |
| for connection in self.active_connections[user_id]: | |
| try: | |
| await connection.send_json(message) | |
| except Exception as e: | |
| print(f"Error sending message to {user_id}: {e}") | |
| async def broadcast(self, message: dict): | |
| for user_id, connections in self.active_connections.items(): | |
| for connection in connections: | |
| try: | |
| await connection.send_json(message) | |
| except Exception as e: | |
| print(f"Error broadcasting message: {e}") | |
| # Global Connection Manager instance | |
| ws_manager = WebSocketConnectionManager() | |