# ========================================== # MELODIX - WebSocket Connection Manager # ========================================== import asyncio from fastapi import WebSocket class ConnectionManager: def __init__(self): # Maps user_id (int) to their active WebSocket connection self.active_connections: dict[int, WebSocket] = {} self.loop = None async def connect(self, user_id: int, websocket: WebSocket): await websocket.accept() self.active_connections[user_id] = websocket self.loop = asyncio.get_running_loop() print(f"[WebSocket] User {user_id} connected. Active connections: {list(self.active_connections.keys())}") def disconnect(self, user_id: int): if user_id in self.active_connections: del self.active_connections[user_id] print(f"[WebSocket] User {user_id} disconnected.") async def send_personal_message(self, message: dict, user_id: int): if user_id in self.active_connections: try: await self.active_connections[user_id].send_json(message) print(f"[WebSocket] Sent message to user {user_id}: {message}") except Exception as e: print(f"[WebSocket] Error sending message to user {user_id}: {e}") self.disconnect(user_id) async def broadcast(self, message: dict): for user_id, connection in list(self.active_connections.items()): try: await connection.send_json(message) except Exception as e: print(f"[WebSocket] Error broadcasting to user {user_id}: {e}") self.disconnect(user_id) manager = ConnectionManager() def send_sync_message(message: dict, user_id: int): """Synchronous helper to send WebSocket messages from synchronous route threads.""" try: if manager.loop is not None and manager.loop.is_running(): # Call from synchronous route thread pool to the main running event loop in a thread-safe way asyncio.run_coroutine_threadsafe(manager.send_personal_message(message, user_id), manager.loop) else: # Fallback to current thread event loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if loop.is_running(): # Call from running event loop (e.g. main thread) loop.create_task(manager.send_personal_message(message, user_id)) else: # Call from synchronous route thread pool loop.run_until_complete(manager.send_personal_message(message, user_id)) except Exception as e: print(f"[WebSocket] Error in send_sync_message for user {user_id}: {e}")