melodix-api / app /websocket_manager.py
GitHub Action
deploy from github actions
f19d9cf
Raw
History Blame Contribute Delete
2.88 kB
# ==========================================
# MELODIX - WebSocket Connection Manager
# ==========================================
import asyncio
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
# Maps user_id (int) to their active WebSocket connection
self.active_connections: dict[int, WebSocket] = {}
self.loop = None
async def connect(self, user_id: int, websocket: WebSocket):
await websocket.accept()
self.active_connections[user_id] = websocket
self.loop = asyncio.get_running_loop()
print(f"[WebSocket] User {user_id} connected. Active connections: {list(self.active_connections.keys())}")
def disconnect(self, user_id: int):
if user_id in self.active_connections:
del self.active_connections[user_id]
print(f"[WebSocket] User {user_id} disconnected.")
async def send_personal_message(self, message: dict, user_id: int):
if user_id in self.active_connections:
try:
await self.active_connections[user_id].send_json(message)
print(f"[WebSocket] Sent message to user {user_id}: {message}")
except Exception as e:
print(f"[WebSocket] Error sending message to user {user_id}: {e}")
self.disconnect(user_id)
async def broadcast(self, message: dict):
for user_id, connection in list(self.active_connections.items()):
try:
await connection.send_json(message)
except Exception as e:
print(f"[WebSocket] Error broadcasting to user {user_id}: {e}")
self.disconnect(user_id)
manager = ConnectionManager()
def send_sync_message(message: dict, user_id: int):
"""Synchronous helper to send WebSocket messages from synchronous route threads."""
try:
if manager.loop is not None and manager.loop.is_running():
# Call from synchronous route thread pool to the main running event loop in a thread-safe way
asyncio.run_coroutine_threadsafe(manager.send_personal_message(message, user_id), manager.loop)
else:
# Fallback to current thread event loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop.is_running():
# Call from running event loop (e.g. main thread)
loop.create_task(manager.send_personal_message(message, user_id))
else:
# Call from synchronous route thread pool
loop.run_until_complete(manager.send_personal_message(message, user_id))
except Exception as e:
print(f"[WebSocket] Error in send_sync_message for user {user_id}: {e}")