Spaces:
Running
Running
| # ========================================== | |
| # MELODIX - WebSocket Connection Manager | |
| # ========================================== | |
| import asyncio | |
| from fastapi import WebSocket | |
| class ConnectionManager: | |
| def __init__(self): | |
| # Maps user_id (int) to their active WebSocket connection | |
| self.active_connections: dict[int, WebSocket] = {} | |
| self.loop = None | |
| async def connect(self, user_id: int, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_connections[user_id] = websocket | |
| self.loop = asyncio.get_running_loop() | |
| print(f"[WebSocket] User {user_id} connected. Active connections: {list(self.active_connections.keys())}") | |
| def disconnect(self, user_id: int): | |
| if user_id in self.active_connections: | |
| del self.active_connections[user_id] | |
| print(f"[WebSocket] User {user_id} disconnected.") | |
| async def send_personal_message(self, message: dict, user_id: int): | |
| if user_id in self.active_connections: | |
| try: | |
| await self.active_connections[user_id].send_json(message) | |
| print(f"[WebSocket] Sent message to user {user_id}: {message}") | |
| except Exception as e: | |
| print(f"[WebSocket] Error sending message to user {user_id}: {e}") | |
| self.disconnect(user_id) | |
| async def broadcast(self, message: dict): | |
| for user_id, connection in list(self.active_connections.items()): | |
| try: | |
| await connection.send_json(message) | |
| except Exception as e: | |
| print(f"[WebSocket] Error broadcasting to user {user_id}: {e}") | |
| self.disconnect(user_id) | |
| manager = ConnectionManager() | |
| def send_sync_message(message: dict, user_id: int): | |
| """Synchronous helper to send WebSocket messages from synchronous route threads.""" | |
| try: | |
| if manager.loop is not None and manager.loop.is_running(): | |
| # Call from synchronous route thread pool to the main running event loop in a thread-safe way | |
| asyncio.run_coroutine_threadsafe(manager.send_personal_message(message, user_id), manager.loop) | |
| else: | |
| # Fallback to current thread event loop | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| if loop.is_running(): | |
| # Call from running event loop (e.g. main thread) | |
| loop.create_task(manager.send_personal_message(message, user_id)) | |
| else: | |
| # Call from synchronous route thread pool | |
| loop.run_until_complete(manager.send_personal_message(message, user_id)) | |
| except Exception as e: | |
| print(f"[WebSocket] Error in send_sync_message for user {user_id}: {e}") | |